# Owner(s): ["module: custom-operators"]
# ruff: noqa: F841

import collections
import io
import itertools
import os
import re
import subprocess
import sys
import tempfile
import typing
import unittest
from functools import partial
from pathlib import Path
from typing import *  # noqa: F403

import numpy as np
import yaml

import torch._custom_ops as custom_ops
import torch.testing._internal.optests as optests
import torch.utils._pytree as pytree
import torch.utils.cpp_extension
from torch import Tensor
from torch._custom_op.impl import CustomOp, infer_schema
from torch._library.fake_profile import (
    generate_yaml_from_profiles,
    load_op_profiles,
    MissingOpProfile,
    OpProfile,
    read_profiles_from_yaml,
    save_op_profiles,
    TensorMetadata,
)
from torch._library.infer_schema import tuple_to_list
from torch._library.opaque_object import make_opaque, OpaqueType
from torch._utils_internal import get_file_path_2  # @manual
from torch.fx.experimental.proxy_tensor import make_fx
from torch.fx.experimental.symbolic_shapes import ShapeEnv
from torch.testing._internal import custom_op_db
from torch.testing._internal.common_cuda import TEST_CUDA
from torch.testing._internal.common_device_type import (
    instantiate_device_type_tests,
    OpDTypes,
    ops,
)
from torch.testing._internal.common_utils import (
    instantiate_parametrized_tests,
    IS_WINDOWS,
    parametrize,
    run_tests,
    scoped_load_inline,
    skipIfTorchDynamo,
    subtest,
    TemporaryFileName,
    TestCase,
)
from torch.testing._internal.custom_op_db import numpy_nonzero
from torch.testing._internal.two_tensor import TwoTensor


# Shadowed by `torch.testing._internal.common_utils.custom_op`
from torch._custom_op.impl import custom_op  # usort: skip

# Needed by TestTypeConversion.test_string_type:
MyList = list
MyTensor = torch.Tensor


def requires_compile(fun):
    fun = unittest.skipIf(IS_WINDOWS, "torch.compile doesn't work with windows")(fun)
    return fun


class CustomOpTestCaseBase(TestCase):
    test_ns = "_test_custom_op"

    def setUp(self):
        super().setUp()
        self.libraries = []

    def tearDown(self):
        super().tearDown()
        import torch._custom_op

        keys = list(torch._custom_op.impl.global_registry.keys())
        for key in keys:
            if not key.startswith(f"{self.test_ns}::"):
                continue
            torch._custom_op.impl.global_registry[key]._destroy()
        if hasattr(torch.ops, self.test_ns):
            delattr(torch.ops, self.test_ns)
        for lib in self.libraries:
            lib._destroy()
        del self.libraries

    def ns(self):
        return getattr(torch.ops, self.test_ns)

    def lib(self):
        result = torch.library.Library(self.test_ns, "FRAGMENT")  # noqa: TOR901
        self.libraries.append(result)
        return result

    def get_op(self, qualname):
        return torch._custom_op.impl.get_op(qualname)


@requires_compile
class TestCustomOpTesting(CustomOpTestCaseBase):
    @parametrize("check_gradients", (False, "auto"))
    @parametrize("dynamic", (True, False))
    def test_aot_autograd_check_degenerate_cases(
        self, device, dynamic, check_gradients
    ):
        def simple(x):
            return x.clone()

        # Should not raise
        x = torch.randn(3, device=device)
        optests.aot_autograd_check(
            simple, (x,), {}, dynamic=dynamic, check_gradients=check_gradients
        )

        def outputs_dont_require_grad(x):
            return x.detach()

        # Should not raise
        y = torch.randn(3, device=device, requires_grad=True)
        optests.aot_autograd_check(
            simple, (y,), {}, dynamic=dynamic, check_gradients=check_gradients
        )

        def no_outputs(x):
            return x.detach()

        # Should not raise
        x = torch.randn(3, device=device, requires_grad=True)
        y = torch.randn(3, device=device, requires_grad=False)
        optests.aot_autograd_check(
            no_outputs, (x,), {}, dynamic=dynamic, check_gradients=check_gradients
        )
        optests.aot_autograd_check(
            no_outputs, (y,), {}, dynamic=dynamic, check_gradients=check_gradients
        )

    def test_incorrect_schema_mutation(self, device):
        lib = self.lib()
        lib.define("foo(Tensor x) -> Tensor")
        op = self.ns().foo.default

        class Foo(torch.autograd.Function):
            @staticmethod
            def forward(ctx, x):
                guard = torch._C._AutoDispatchBelowAutograd()
                try:
                    return op(x)
                finally:
                    del guard

            @staticmethod
            def backward(ctx, gx):
                return gx

        def foo_impl(x):
            x.sin_()
            return x.clone()

        lib.impl("foo", Foo.apply, "Autograd")
        lib.impl("foo", foo_impl, "CPU")
        lib.impl("foo", foo_impl, "CUDA")
        lib.impl("foo", foo_impl, "XPU")

        x = torch.tensor(3.14159 / 3, requires_grad=True, device=device)
        with self.assertRaisesRegex(
            optests.OpCheckError, "Argument x is not defined as mutable but was mutated"
        ):
            torch.library.opcheck(op, (x,), {})

    def test_incorrect_schema_view(self, device):
        lib = self.lib()
        lib.define("foo(Tensor x) -> Tensor")
        op = self.ns().foo.default

        class Foo(torch.autograd.Function):
            @staticmethod
            def forward(ctx, x):
                # Emulate AutoDispatchBelowADInplaceOrView, which is not bound into python
                with torch._C._AutoDispatchBelowAutograd():
                    with torch._C._ExcludeDispatchKeyGuard(
                        torch._C.DispatchKeySet(torch._C.DispatchKey.ADInplaceOrView)
                    ):
                        return op(x)

            @staticmethod
            def backward(ctx, gx):
                return gx

        def foo_impl(x):
            return x.view_as(x)

        def foo_meta(x):
            return x.view_as(x)

        lib.impl("foo", Foo.apply, "Autograd")
        lib.impl("foo", foo_impl, "CPU")
        lib.impl("foo", foo_meta, "Meta")

        x = torch.tensor(3.14159 / 3, requires_grad=True)
        with self.assertRaisesRegex(
            optests.OpCheckError,
            "Argument x is not defined to alias output but was aliasing",
        ):
            torch.library.opcheck(op, (x,), {})

    # https://github.com/pytorch/pytorch/issues/142410
    def test_opcheck_unbacked_stride(self, device):
        @torch.library.custom_op("test::f", mutates_args=[])
        def f(x: torch.Tensor) -> torch.Tensor:
            return x.new_zeros((x.size(0), 18))

        @f.register_fake
        def _(x: torch.Tensor) -> torch.Tensor:
            ctx = torch.library.get_ctx()
            s = ctx.new_dynamic_size()
            return torch.empty(x.shape[0], s, device=x.device, dtype=x.dtype)

        example = torch.zeros([10, 20], device=device)
        torch.library.opcheck(f, args=[example])

    # https://github.com/pytorch/pytorch/issues/150472
    def test_single_element_tuple_output(self, device):
        # Helper function to register id_tuple custom and the fake tensor implementation
        # so that Dynamo has the fake tensor implementation
        def get_id_tuple():
            @torch.library.custom_op("test::id_tuple", mutates_args=[])
            def id_tuple(x: torch.Tensor) -> Tuple[torch.Tensor]:
                return (x.clone(),)

            @id_tuple.register_fake
            def _(
                x: torch.Tensor,
            ) -> Tuple[torch.Tensor]:
                return (x.clone(),)

            return id_tuple

        id_tuple = get_id_tuple()
        x = torch.randn(3, device=device)
        ret = id_tuple(x)
        # Check if ret is a tuple and has exactly one and the same element
        self.assertIsInstance(ret, tuple)
        self.assertEqual(len(ret), 1)
        self.assertEqual(x, ret[0])

    def test_missing_abstract_impl(self, device):
        lib = self.lib()
        lib.define("foo(Tensor x) -> Tensor")
        op = self.ns().foo.default

        class Foo(torch.autograd.Function):
            @staticmethod
            def forward(ctx, x):
                with torch._C._AutoDispatchBelowAutograd():
                    return op(x)

            @staticmethod
            def backward(ctx, gx):
                return 2 * gx

        def foo_impl(x):
            return torch.tensor(x.cpu().numpy() ** 2, device=x.device)

        lib.impl("foo", Foo.apply, "Autograd")
        lib.impl("foo", foo_impl, "CPU")
        lib.impl("foo", foo_impl, "CUDA")
        lib.impl("foo", foo_impl, "XPU")

        x = torch.tensor([0, 1.0], requires_grad=True)
        with self.assertRaisesRegex(
            optests.OpCheckError,
            "_test_custom_op.foo.default",
        ):
            torch.library.opcheck(op, (x,), {})

    @skipIfTorchDynamo("Expected to fail due to no FakeTensor support; not a bug")
    def test_incorrect_abstract_impl(self, device):
        lib = self.lib()
        lib.define("foo(Tensor x) -> Tensor")
        op = self.ns().foo.default

        class Foo(torch.autograd.Function):
            @staticmethod
            def forward(ctx, x):
                # Emulate AutoDispatchBelowADInplaceOrView, which is not bound into python
                guard = torch._C._AutoDispatchBelowAutograd()
                guard2 = torch._C.ExcludeDispatchKeyGuard(
                    torch._C.DispatchKeySet(torch._C.DispatchKey.ADInplaceOrView)
                )
                try:
                    return op(x)
                finally:
                    del guard
                    del guard2

            @staticmethod
            def backward(ctx, gx):
                return gx

        def foo_impl(x):
            return x**2

        def foo_meta(x):
            return x.unsqueeze(1) ** 2

        lib.impl("foo", Foo.apply, "Autograd")
        lib.impl("foo", foo_impl, "CPU")
        lib.impl("foo", foo_impl, "CUDA")
        lib.impl("foo", foo_impl, "XPU")
        lib.impl("foo", foo_meta, "Meta")

        x = torch.tensor([0, 1.0], requires_grad=True)
        with self.assertRaisesRegex(optests.OpCheckError, "Shapes .* are not equal"):
            torch.library.opcheck(op, (x,), {})

    def test_missing_functionalization(self, device):
        lib = self.lib()
        lib.define("foo(Tensor(a!) x) -> Tensor(a!)")
        op = self.ns().foo.default

        class Foo(torch.autograd.Function):
            @staticmethod
            def forward(ctx, x):
                ctx.mark_dirty(x)
                with torch._C._AutoDispatchBelowAutograd():
                    return op(x)

            @staticmethod
            def backward(ctx, gx):
                return gx

        def foo_impl(x):
            return x.sin_()

        def foo_meta(x):
            return x

        lib.impl("foo", Foo.apply, "Autograd")
        lib.impl("foo", foo_impl, "CPU")
        lib.impl("foo", foo_impl, "CUDA")
        lib.impl("foo", foo_impl, "XPU")
        lib.impl("foo", foo_meta, "Meta")

        x = torch.tensor([0, 1.0])
        y = x.clone()
        with self.assertRaisesRegex(
            optests.OpCheckError,
            "We only support functionalizing operators whose outputs do not have alias annotations",
        ):
            torch.library.opcheck(op, (y,), {})

    def test_autograd_registered_at_backend(self, device):
        lib = self.lib()
        lib.define("foo(Tensor x) -> Tensor")
        op = self.ns().foo.default

        class Foo(torch.autograd.Function):
            @staticmethod
            def forward(ctx, x):
                return x.clone()

            @staticmethod
            def backward(ctx, gx):
                return gx * 0.5

        lib.impl("foo", Foo.apply, "CPU")
        lib.impl("foo", Foo.apply, "CUDA")
        lib.impl("foo", Foo.apply, "XPU")
        lib.impl("foo", lambda x: x.clone(), "Meta")

        x = torch.randn([], requires_grad=True)

        with self.assertRaisesRegex(
            torch.testing._internal.optests.OpCheckError,
            "does not have an autograd kernel",
        ):
            torch.library.opcheck(op, (x,), {})

        # I'm not sure why this is necessary
        del lib

    def test_global_state_mutation(self, device):
        lib = self.lib()
        lib.define("foo(Tensor x) -> Tensor")
        op = self.ns().foo.default

        class Foo(torch.autograd.Function):
            invoked = 0

            @staticmethod
            def forward(ctx, x):
                Foo.invoked += 1
                return x.clone() * Foo.invoked

            @staticmethod
            def backward(ctx, gx):
                return gx

        lib.impl("foo", Foo.apply, "CompositeImplicitAutograd")

        x = torch.tensor(3.14159 / 3, requires_grad=True)
        with self.assertRaisesRegex(
            optests.OpCheckError, "eager-mode PyTorch vs AOTDispatcher"
        ):
            torch.library.opcheck(op, (x,), {})

        # Test that we can actually see the absolute difference numbers
        try:
            torch.library.opcheck(op, (x,), {})
        except optests.OpCheckError as err:
            orig = err.__context__.__context__
            self.assertIn("Absolute difference:", str(orig))

        # Test atol/rtol overrides
        torch.library.opcheck(op, (x,), {}, atol=3, rtol=0.01)

    @ops(custom_op_db.custom_op_db, dtypes=OpDTypes.any_one)
    def test_opcheck_opinfo(self, device, dtype, op):
        for sample_input in op.sample_inputs(
            device, dtype, requires_grad=op.supports_autograd
        ):
            args = [sample_input.input] + list(sample_input.args)
            kwargs = sample_input.kwargs
            torch.library.opcheck(op.op, args, kwargs)

    def test_opcheck_fails_basic(self, device):
        @custom_op(f"{self.test_ns}::foo")
        def foo(x: torch.Tensor) -> torch.Tensor: ...

        @foo.impl(["cpu", "cuda"])
        def foo_impl(x):
            return x.sum()

        x = torch.randn(3, device=device, requires_grad=True)
        # Triggers the CustomOp autograd NYI error
        with self.assertRaisesRegex(
            optests.OpCheckError, "Autograd has not been implemented for operator"
        ):
            torch.library.opcheck(self.get_op(f"{self.test_ns}::foo"), (x,), {})

    def test_autograd_registration_check_autograd_kernel(self, device):
        lib = self.lib()
        lib.define("foo(Tensor x) -> Tensor")
        op = self.ns().foo.default

        class Foo(torch.autograd.Function):
            @staticmethod
            def forward(ctx, x):
                with torch._C._AutoDispatchBelowAutograd():
                    return op(x)

            @staticmethod
            def backward(ctx, gx):
                return gx

        def foo_impl(x):
            return x.sin()

        lib.impl("foo", Foo.apply, "Autograd")
        lib.impl("foo", foo_impl, "CPU")
        lib.impl("foo", foo_impl, "CUDA")
        lib.impl("foo", foo_impl, "XPU")

        x = torch.randn(3, requires_grad=True, device=device)
        # Should not raise
        optests.autograd_registration_check(op, (x,), {})

    def test_autograd_registration_check_compositeimplicitautograd(self, device):
        lib = self.lib()
        lib.define("foo(Tensor x) -> Tensor")
        op = self.ns().foo.default

        def foo_impl(x):
            return x.sin().cos()

        lib.impl("foo", foo_impl, "CompositeImplicitAutograd")

        x = torch.randn(3, requires_grad=True, device=device)
        # Should not raise
        optests.autograd_registration_check(op, (x,), {})

    def test_autograd_registration_check_incorrect_composite(self, device):
        lib = self.lib()
        lib.define("foo(Tensor x) -> Tensor")
        op = self.ns().foo.default

        def foo_impl(x):
            return x.sin().cos()

        lib.impl("foo", foo_impl, "CompositeExplicitAutograd")

        x = torch.randn(3, requires_grad=True, device=device)
        with self.assertRaisesRegex(AssertionError, "incorrectly registered"):
            optests.autograd_registration_check(op, (x,), {})

    def test_autograd_registration_check_incorrect(self, device):
        lib = self.lib()
        lib.define("foo(Tensor x) -> Tensor")
        op = self.ns().foo.default

        class Foo(torch.autograd.Function):
            @staticmethod
            def forward(ctx, x):
                return torch.sin(x)

            @staticmethod
            def backward(ctx, gx):
                return gx

        lib.impl("foo", Foo.apply, "CPU")
        lib.impl("foo", Foo.apply, "CUDA")
        lib.impl("foo", Foo.apply, "XPU")

        x = torch.randn(3, requires_grad=True, device=device)
        with self.assertRaisesRegex(AssertionError, "incorrectly registered"):
            optests.autograd_registration_check(op, (x,), {})

    def test_assert_raises_regex(self, device):
        from torch.testing._internal.optests.aot_autograd import assert_raises_regex

        with assert_raises_regex(RuntimeError, "c"):
            raise RuntimeError("abcd")
        with assert_raises_regex(RuntimeError, "c.*"):
            raise RuntimeError("abcd")
        with self.assertRaisesRegex(AssertionError, "instead got"):
            with assert_raises_regex(RuntimeError, "c.*"):
                raise ValueError("abcd")
        with self.assertRaisesRegex(AssertionError, "Expected exception"):
            with assert_raises_regex(RuntimeError, "c.*"):
                pass
        with self.assertRaisesRegex(AssertionError, "to match regex"):
            with assert_raises_regex(RuntimeError, "f"):
                raise RuntimeError("abcd")


class TestCustomOp(CustomOpTestCaseBase):
    test_ns = "_test_custom_op"

    @requires_compile
    def test_functionalize_error(self):
        with torch.library._scoped_library(self.test_ns, "FRAGMENT") as lib:
            lib.define("foo(Tensor(a!) x) -> Tensor(a!)")

            def foo(x):
                return x.sin_()

            lib.impl("foo", foo, "CompositeExplicitAutograd")
            foo_op = self.get_op(f"{self.test_ns}::foo")

            lib.define("bar(Tensor(a) x) -> Tensor(a)")

            def bar(x):
                return x.view(-1)

            lib.impl("bar", bar, "CompositeExplicitAutograd")
            bar_op = self.get_op(f"{self.test_ns}::bar")

            msg = r".*We only support functionalizing operators whose outputs do not have alias annotations"

            x = torch.randn(3)

            @torch.compile(backend="aot_eager", fullgraph=True)
            def f(x):
                return foo_op(x)

            @torch.compile(backend="aot_eager", fullgraph=True)
            def g(x):
                return bar_op(x)

            with self.assertRaisesRegex(RuntimeError, msg):
                f(x)
            with self.assertRaisesRegex(RuntimeError, msg):
                g(x)

    def test_invalid_schemas(self):
        # function schema validation goes through torchgen, so this is just a
        # basic test.
        with self.assertRaisesRegex(AssertionError, "Invalid function schema: foo"):
            custom_ops.custom_op(f"{TestCustomOp.test_ns}::foo", "(")

    def test_invalid_qualname(self):
        with self.assertRaisesRegex(ValueError, "overload"):
            custom_ops.custom_op(f"{TestCustomOp.test_ns}::foo.Tensor", "() -> ()")

    def test_name_must_match(self):
        with self.assertRaisesRegex(ValueError, "to have name"):

            @custom_ops.custom_op(f"{TestCustomOp.test_ns}::foo")
            def baz(x: Tensor) -> Tensor:
                raise NotImplementedError

    def test_unsupported_schemas(self):
        with self.assertRaisesRegex(ValueError, "only supports functional"):
            custom_ops.custom_op(
                f"{TestCustomOp.test_ns}::foo", "(Tensor(a!) x) -> Tensor(a)"
            )(foo)
        with self.assertRaisesRegex(ValueError, "only supports functional"):
            custom_ops.custom_op(
                f"{TestCustomOp.test_ns}::foo", "(Tensor(a) x) -> Tensor(a)"
            )(foo)
        with self.assertRaisesRegex(ValueError, "only supports functional"):
            custom_ops.custom_op(f"{TestCustomOp.test_ns}::foo", "(Tensor x) -> ()")(
                foo
            )
        with self.assertRaisesRegex(ValueError, "self"):
            custom_ops.custom_op(f"{TestCustomOp.test_ns}::foo", "(Tensor self) -> ()")(
                foo
            )

    # Tests for the older custom_op API
    def test_schema_matches_signature(self):
        with self.assertRaisesRegex(ValueError, "signature to match"):

            @custom_op(f"{TestCustomOp.test_ns}::blah", "(Tensor y) -> Tensor")
            def blah(x):
                pass

        with self.assertRaisesRegex(ValueError, "signature to match"):

            @custom_op(
                f"{TestCustomOp.test_ns}::blah2", "(Tensor x, *, Tensor y) -> Tensor"
            )
            def blah2(x, y):
                pass

        with self.assertRaisesRegex(ValueError, "signature to match"):

            @custom_op(
                f"{TestCustomOp.test_ns}::blah3",
                "(Tensor x, *, Tensor w, Tensor z) -> Tensor",
            )
            def blah3(x, *, y, z):
                pass

        with self.assertRaisesRegex(ValueError, "signature to match"):

            @custom_op(
                f"{TestCustomOp.test_ns}::blah4",
                "(Tensor x, *, Tensor z, Tensor y) -> Tensor",
            )
            def blah4(x, *, y, z):
                pass

        with self.assertRaisesRegex(ValueError, "not supported"):

            @custom_op(f"{TestCustomOp.test_ns}::blah5", "(Tensor x) -> Tensor")
            def blah5(*args):
                pass

        with self.assertRaisesRegex(ValueError, "not supported"):

            @custom_op(
                f"{TestCustomOp.test_ns}::blah6", "(*, Tensor z, Tensor y) -> Tensor"
            )
            def blah6(**kwargs):
                pass

        with self.assertRaisesRegex(ValueError, "default arguments"):

            @custom_op(
                f"{TestCustomOp.test_ns}::blah7", "(Tensor x, *, Tensor y) -> Tensor"
            )
            def blah7(x=1, *, y):
                pass

        with self.assertRaisesRegex(ValueError, "default arguments"):

            @custom_op(
                f"{TestCustomOp.test_ns}::blah8", "(Tensor x, *, Tensor y) -> Tensor"
            )
            def blah8(x, *, y=1):
                pass

        # kwonly-arg works
        @custom_op(
            f"{TestCustomOp.test_ns}::blah9", "(Tensor x, *, Tensor y) -> Tensor"
        )
        def blah9(x, *, y):
            pass

    def test_infer_schema_no_return(self):
        with self.assertRaisesRegex(
            ValueError, "No return type annotation was provided. Please add one."
        ):

            @torch.library.custom_op("mylib::foo", mutates_args={})
            def foo(x: torch.Tensor, y: int):
                return x * y

    def test_infer_schema_supported(self):
        def a(x: Tensor) -> Tensor:
            return torch.empty([])

        self.assertExpectedInline(
            infer_schema(a, mutates_args=()), """(Tensor x) -> Tensor"""
        )

        def kwonly1(x: Tensor, *, y: int, z: float) -> Tensor:
            return torch.empty([])

        self.assertExpectedInline(
            infer_schema(kwonly1, mutates_args=()),
            """(Tensor x, *, SymInt y, float z) -> Tensor""",
        )

        def kwonly2(*, y: Tensor) -> Tensor:
            return torch.empty([])

        self.assertExpectedInline(
            infer_schema(kwonly2, mutates_args=()), """(*, Tensor y) -> Tensor"""
        )

        def b(
            x: Tensor,
            y: int,
            z: bool,
            a: float,
            b: torch.dtype,
            c: torch.device,
            d: torch.types.Number,
        ) -> Tuple[Tensor, int, float, bool]:
            return torch.empty([]), 1, 0.1, True

        self.assertExpectedInline(
            infer_schema(b, mutates_args=()),
            """(Tensor x, SymInt y, bool z, float a, ScalarType b, Device c, Scalar d) -> (Tensor, SymInt, float, bool)""",
        )

        def c(
            x: Tensor,
            y: Sequence[Tensor],
            z: Optional[Tensor],
            w: Sequence[Optional[Tensor]],
        ) -> List[Tensor]:
            return [torch.empty([])]

        self.assertExpectedInline(
            infer_schema(c, mutates_args=()),
            """(Tensor x, Tensor[] y, Tensor? z, Tensor?[] w) -> Tensor[]""",
        )

        def d(x: Tensor) -> Tuple[List[Tensor], Tensor]:
            return [torch.empty([])], torch.empty([])

        self.assertExpectedInline(
            infer_schema(d, mutates_args=()), """(Tensor x) -> (Tensor[], Tensor)"""
        )

        def e() -> Tensor:
            return torch.empty([])

        self.assertExpectedInline(infer_schema(e, mutates_args=()), """() -> Tensor""")

        def f(x: Tensor) -> None:
            pass

        self.assertExpectedInline(
            infer_schema(f, mutates_args=()), """(Tensor x) -> ()"""
        )

        def g(
            x: Tensor, y: List[Tensor], z: List[Tensor], w: List[Optional[Tensor]]
        ) -> None:
            pass

        self.assertExpectedInline(
            infer_schema(g, mutates_args=()),
            """(Tensor x, Tensor[] y, Tensor[] z, Tensor?[] w) -> ()""",
        )

        self.assertExpectedInline(
            infer_schema(g, mutates_args={"x", "w", "z"}),
            """(Tensor(a0!) x, Tensor[] y, Tensor(a2!)[] z, Tensor(a3!)?[] w) -> ()""",
        )

        self.assertExpectedInline(
            infer_schema(g, mutates_args="unknown"),
            """(Tensor(a0!) x, Tensor(a1!)[] y, Tensor(a2!)[] z, Tensor(a3!)?[] w) -> ()""",
        )

        def h(
            x: Tensor,
            a: Optional[int] = None,
            b: float = 3.14,
            c: bool = True,
            d: int = 3,
            e: str = "foo",
            f: torch.dtype = torch.float,
            g: torch.dtype = torch.float32,
            h: torch.dtype = torch.int,
            i: torch.device = torch.device("cpu:0"),
            j: torch.device = "cpu",
        ) -> None:
            pass

        self.assertExpectedInline(
            infer_schema(h, mutates_args=()),
            (
                """(Tensor x, SymInt? a=None, float b=3.14, bool c=True, SymInt d=3, str e="foo", """
                """ScalarType f=float32, ScalarType g=float32, ScalarType h=int32, Device i="cpu:0", Device j="cpu") -> ()"""
            ),
        )

        def foo_impl(x: torch.Tensor) -> torch.Tensor:
            return x.sin()

        schema = torch.library.infer_schema(foo_impl, op_name="myop", mutates_args={})
        self.assertExpectedInline(schema, "myop(Tensor x) -> Tensor")

        # Ensure that a global in this file is properly found & evaluated.
        def stringy_fn(x: torch.Tensor) -> "MyList[torch.Tensor]":
            return [torch.randn_like(x)]

        schema = infer_schema(stringy_fn, mutates_args={})
        self.assertExpectedInline(schema, "(Tensor x) -> Tensor[]")

        # Make sure that substrings are evaluated properly.
        def substringy_fn(
            x: torch.Tensor,
        ) -> list["MyTensor"]:
            return [torch.randn_like(x)]

        schema = infer_schema(substringy_fn, mutates_args={})
        self.assertExpectedInline(schema, "(Tensor x) -> Tensor[]")

    def test_infer_schema_unsupported(self):
        with self.assertRaisesRegex(ValueError, "varargs"):

            def foo(*args):
                raise NotImplementedError

            infer_schema(foo, mutates_args=())

        with self.assertRaisesRegex(ValueError, "varkwargs"):

            def foo(**kwargs):
                raise NotImplementedError

            infer_schema(foo, mutates_args=())

        with self.assertRaisesRegex(ValueError, "must have a type annotation"):

            def foo(x):
                raise NotImplementedError

            infer_schema(foo, mutates_args=())

        with self.assertRaisesRegex(ValueError, "unsupported"):

            def foo(x: Tensor) -> Tuple[Tensor, ...]:
                raise NotImplementedError

            infer_schema(foo, mutates_args=())

        with self.assertRaisesRegex(ValueError, "can be mutated"):

            def foo(x: Tensor, y: int) -> Tensor:
                raise NotImplementedError

            infer_schema(foo, mutates_args={"y"})

        # Ensure that a global defined in infer_schema's file ISN'T found.
        with self.assertRaisesRegex(
            ValueError,
            r"Unsupported type annotation list\[_TestTensor\]\. It is not a type\.",
        ):

            def stringy_bad_type(
                x: torch.Tensor,
            ) -> "list[_TestTensor]":
                return [torch.randn_like(x)]

            self.assertTrue(hasattr(torch._library.infer_schema, "_TestTensor"))
            schema = infer_schema(stringy_bad_type, mutates_args={})

    def _generate_examples(self, typ):
        if typ is int:
            return [17]
        if typ is float:
            return [3.14]
        if typ is bool:
            return [True]
        if typ is str:
            return ["foo"]
        if typ is torch.dtype:
            return [torch.float32]
        if typ is torch.device:
            return [torch.device("cpu")]
        if typ == torch.types.Number:
            return [2.718]
        if typ is torch.Tensor:
            return [torch.tensor(3)]
        if typ == Optional[torch.types.Number]:
            return [None, 2.718]
        if typ == OpaqueType:
            return [make_opaque("moo")]
        origin = typing.get_origin(typ)
        if origin is Union:
            args = typing.get_args(typ)
            assert len(args) == 2 and (args[0] is type(None) or args[1] is type(None))
            elt = args[0] if args[1] is type(None) else args[1]
            return self._generate_examples(elt) + [None]
        if origin is list:
            args = typing.get_args(typ)
            assert len(args) == 1
            elt = args[0]
            return [
                self._generate_examples(elt),
                self._generate_examples(elt),
                self._generate_examples(elt),
            ]
        if origin is collections.abc.Sequence:
            args = typing.get_args(typ)
            assert len(args) == 1
            examples = self._generate_examples(args[0])
            return list(itertools.product(examples, examples)) + []
        raise NotImplementedError(
            f"testrunner cannot generate instanstance of type {typ}"
        )

    def test_supported_return_types_single_return(self):
        for typ in torch._library.infer_schema.SUPPORTED_RETURN_TYPES:
            for example in self._generate_examples(typ):
                try:

                    @custom_ops.custom_op(f"{self.test_ns}::foo")
                    def foo(x: Tensor) -> typ:
                        raise NotImplementedError

                    @custom_ops.impl(f"{self.test_ns}::foo")
                    def foo_impl(x: Tensor) -> typ:
                        return example

                    op = self.get_op(f"{self.test_ns}::foo")
                    result = op(torch.randn([]))
                    self.assertEqual(result, example, msg=f"{typ} {example}")
                finally:
                    custom_ops._destroy(f"{self.test_ns}::foo")

    def test_supported_return_types_multi_return(self):
        for typ in torch._library.infer_schema.SUPPORTED_RETURN_TYPES:
            for example in self._generate_examples(typ):
                try:

                    @custom_ops.custom_op(f"{self.test_ns}::foo")
                    def foo(x: Tensor) -> Tuple[typ, typ]:
                        raise NotImplementedError

                    @custom_ops.impl(f"{self.test_ns}::foo")
                    def foo_impl(x: Tensor) -> Tuple[typ, typ]:
                        return (example, example)

                    op = self.get_op(f"{self.test_ns}::foo")
                    result = op(torch.randn([]))
                    expected = (example, example)
                    self.assertEqual(result, expected, msg=f"{typ} {example}")
                finally:
                    custom_ops._destroy(f"{self.test_ns}::foo")

    def test_supported_param_types(self):
        for typ in torch._library.infer_schema.SUPPORTED_PARAM_TYPES:

            @custom_ops.custom_op(f"{TestCustomOp.test_ns}::foo")
            def foo(x: Tensor, y: typ) -> Tensor:
                raise NotImplementedError

            yeet = None

            @custom_ops.impl(f"{TestCustomOp.test_ns}::foo", device_types=["cpu"])
            def foo_cpu(x, y):
                nonlocal yeet
                yeet = y
                return x.clone()

            try:
                for example in self._generate_examples(typ):
                    op = self.get_op(f"{self.test_ns}::foo")
                    op(torch.randn([]), example)
                    self.assertEqual(yeet, example, msg=f"{typ} {example}")
                    yeet = None
            finally:
                custom_ops._destroy(f"{TestCustomOp.test_ns}::foo")

    def test_sequences(self):
        # Sequence[int] gets automagically turned into int[] in the schema.
        # This test checks that we actually do support arbitrary sequence types.
        class MySequence(collections.abc.Sequence):
            def __init__(self) -> None:
                self._container = [1, 2, 3]

            def __getitem__(self, idx):
                return self._container[idx]

            def __len__(self):
                return len(self._container)

        @custom_ops.custom_op(f"{self.test_ns}::foo")
        def foo(x: torch.Tensor, sizes: Sequence[int]) -> torch.Tensor:
            raise NotImplementedError

        called = 0

        @custom_ops.impl(f"{self.test_ns}::foo", device_types="cpu")
        def foo_cpu(x, sizes):
            nonlocal called
            called += 1
            # Dispatcher will normalize the sequence type into a List
            self.assertEqual(sizes, [1, 2, 3])
            return x.clone()

        x = torch.randn([])
        seq = MySequence()
        op = self.get_op(f"{self.test_ns}::foo")
        op(x, seq)
        self.assertEqual(called, 1)

    def test_unsupported_param_types(self):
        # Not comprehensive (it doesn't need to be), just a check that our mechanism works
        with self.assertRaisesRegex(ValueError, "unsupported type"):

            @custom_ops.custom_op(f"{TestCustomOp.test_ns}::foo")
            def foo(x: Tensor, y: List[Optional[int]]) -> Tensor:
                raise NotImplementedError

            del foo

        with self.assertRaisesRegex(ValueError, "unsupported type"):
            # int[N] in Dispatcher is a bit wild, so we don't try to support it.
            @custom_ops.custom_op(f"{TestCustomOp.test_ns}::foo")
            def foo(x: Tensor, y: Tuple[int, int]) -> Tensor:
                raise NotImplementedError

            del foo

        with self.assertRaisesRegex(ValueError, r"For example, list\[int\]"):
            # test that we propose a correct and supported type.
            @torch.library.custom_op(f"{TestCustomOp.test_ns}::foo", mutates_args={})
            def foo(x: Tensor, y: Tuple[int, int]) -> Tensor:
                raise NotImplementedError

            del foo

        with self.assertRaises(ValueError) as cm:

            @torch.library.custom_op(f"{TestCustomOp.test_ns}::foo", mutates_args={})
            def foo(x: Tensor, y: Tuple[int, float]) -> Tensor:
                raise NotImplementedError

            del foo

            self.assertNotIn("example", str(cm.exception), "")

        with self.assertRaisesRegex(ValueError, "unsupported type"):

            @custom_ops.custom_op(f"{TestCustomOp.test_ns}::foo")
            def foo(x: Tensor, y: Callable) -> Tensor:
                raise NotImplementedError

            del foo

        # Define a named tuple for a Point with x and y coordinates
        Point = collections.namedtuple("Point", ["x", "y"])
        with self.assertRaisesRegex(ValueError, "unsupported type"):

            @custom_ops.custom_op(f"{TestCustomOp.test_ns}::foo")
            def foo(x: Tensor, y: Point) -> Tensor:
                raise NotImplementedError

            del foo

    def test_supported_schemas(self):
        # All of these should already be tested by PyTorch codegen
        # (we share the same mechanism), but here's a sanity check.
        schemas = [
            "(Tensor x) -> Tensor",
            "(Tensor x) -> Tensor y",
            "(Tensor[] x) -> Tensor y",
            "(Tensor x) -> (Tensor, Tensor)",
            "(Tensor x) -> (Tensor y, Tensor z)",
            "(Tensor x) -> (Tensor y, Tensor z)",
        ]
        other_schemas = [
            "(Tensor x, Tensor w) -> (Tensor y, Tensor z)",
            "(Tensor x, Tensor w) -> (Tensor, Tensor)",
            "(Tensor x, Tensor w) -> Tensor",
            "(Tensor? x, Tensor w) -> Tensor",
            "(Tensor? x, Tensor[] w) -> Tensor",
            "(Tensor x, int[] w) -> Tensor",
            "(Tensor x, SymInt[] w) -> Tensor",
            "(Tensor x, Scalar w) -> Tensor",
            "(Tensor x, float w) -> Tensor",
            "(Tensor x, float? w) -> Tensor",
            "(Tensor x, bool[] w) -> Tensor",
        ]

        for schema in schemas:
            custom_ops.custom_op(f"{TestCustomOp.test_ns}::foo", schema)
            custom_ops._destroy(f"{TestCustomOp.test_ns}::foo")
        for schema in other_schemas:
            custom_ops.custom_op(f"{TestCustomOp.test_ns}::bar", schema)
            custom_ops._destroy(f"{TestCustomOp.test_ns}::bar")

    def test_reserved_ns(self):
        from torch._custom_op.impl import RESERVED_NS

        for ns in RESERVED_NS:
            with self.assertRaisesRegex(ValueError, "is a reserved namespace"):
                custom_ops.custom_op(f"{ns}::foo", "(Tensor x) -> Tensor")

            with self.assertRaisesRegex(ValueError, "is a reserved namespace"):

                @custom_ops.custom_op(f"{ns}::foo2")
                def foo2(x: torch.Tensor) -> torch.Tensor:
                    raise NotImplementedError

    def test_private_ctor(self):
        with self.assertRaisesRegex(RuntimeError, "CustomOp constructor is private"):
            CustomOp(None, None, None, None, None)

    def test_lifetime(self):
        @custom_ops.custom_op(f"{TestCustomOp.test_ns}::foo")
        def foo(x: torch.Tensor) -> torch.Tensor:
            raise NotImplementedError

        custom_op = torch._custom_op.impl.get_op(f"{TestCustomOp.test_ns}::foo")

        # We can't define an op multiple times,
        with self.assertRaisesRegex(RuntimeError, "multiple times"):

            @custom_ops.custom_op(f"{TestCustomOp.test_ns}::foo")
            def foo(x: torch.Tensor) -> torch.Tensor:  # noqa: F811
                raise NotImplementedError

        # Unless we delete the original op.
        custom_ops._destroy(f"{TestCustomOp.test_ns}::foo")

        # Smoke test
        @custom_ops.custom_op(f"{TestCustomOp.test_ns}::foo")
        def foo(x: torch.Tensor) -> torch.Tensor:  # noqa: F811
            raise NotImplementedError

        custom_ops._destroy(f"{TestCustomOp.test_ns}::foo")

    def test_autograd_notimplemented(self):
        @custom_ops.custom_op(f"{TestCustomOp.test_ns}::foo")
        def foo(x: torch.Tensor) -> torch.Tensor:  # noqa: F811
            raise NotImplementedError

        x = torch.randn(3, requires_grad=True)
        op = self.get_op(f"{self.test_ns}::foo")
        with self.assertRaisesRegex(RuntimeError, "Autograd has not been implemented"):
            op(x)
        custom_ops._destroy(f"{TestCustomOp.test_ns}::foo")
        del foo

        @custom_ops.custom_op(f"{TestCustomOp.test_ns}::foo")
        def foo(x: Sequence[torch.Tensor]) -> torch.Tensor:
            raise NotImplementedError

        x = torch.randn(3, requires_grad=True)
        y = torch.randn(3)
        op = self.get_op(f"{self.test_ns}::foo")
        with self.assertRaisesRegex(RuntimeError, "Autograd has not been implemented"):
            op([y, x])
        custom_ops._destroy(f"{TestCustomOp.test_ns}::foo")
        del foo

        @custom_ops.custom_op(f"{TestCustomOp.test_ns}::foo")
        def foo(x: torch.Tensor, y: torch.Tensor) -> torch.Tensor:
            raise NotImplementedError

        x = torch.randn(3, requires_grad=True)
        y = torch.randn(3)
        op = self.get_op(f"{self.test_ns}::foo")
        with self.assertRaisesRegex(RuntimeError, "Autograd has not been implemented"):
            op(y, x)
        custom_ops._destroy(f"{TestCustomOp.test_ns}::foo")

    def test_autograd_notimplemented_gradmode(self):
        @custom_ops.custom_op(f"{TestCustomOp.test_ns}::foo")
        def foo(x: torch.Tensor, y: torch.Tensor) -> torch.Tensor:
            raise NotImplementedError

        @custom_ops.impl(f"{TestCustomOp.test_ns}::foo")
        def foo_impl(x, y):
            return x * y

        x = torch.randn(3, requires_grad=True)
        y = torch.randn(3)
        op = self.get_op(f"{self.test_ns}::foo")
        with torch.no_grad():
            # Shouldn't raise, because we are in no_grad
            op(y, x)

    def test_impl_cpu(self):
        @custom_ops.custom_op(f"{TestCustomOp.test_ns}::foo")
        def foo(x: torch.Tensor) -> torch.Tensor:
            raise NotImplementedError

        @custom_ops.impl(f"{TestCustomOp.test_ns}::foo", device_types="cpu")
        def foo_cpu(x):
            return x.sin()

        x = torch.randn(3)
        op = self.get_op(f"{self.test_ns}::foo")
        result = op(x)
        self.assertEqual(result, foo_cpu(x))

    def test_impl_invalid_devices(self):
        @custom_ops.custom_op(f"{TestCustomOp.test_ns}::foo")
        def foo(x: torch.Tensor) -> torch.Tensor:
            raise NotImplementedError

        def foo_impl(x):
            return x.sin()

        from torch._custom_op.impl import SUPPORTED_DEVICE_TYPE_TO_KEY

        for device_type in SUPPORTED_DEVICE_TYPE_TO_KEY:
            # Smoke test: should not raise error
            custom_ops.impl(f"{TestCustomOp.test_ns}::foo", device_types=device_type)(
                foo_impl
            )

        # Not supported by this API: we can either support them in the future
        # or provide some other CustomOp.def_* function. This depends on how
        # common the use cases are.
        for invalid_type in ["hip", "xla", "mkldnn", ["cpu", "hip"]]:
            with self.assertRaisesRegex(ValueError, "we only support device_type"):
                custom_ops.impl(
                    f"{TestCustomOp.test_ns}::foo", device_types=invalid_type
                )(foo_impl)

    def test_backward_partially_registered(self):
        @custom_ops.custom_op(f"{TestCustomOp.test_ns}::foo")
        def foo(x: torch.Tensor) -> torch.Tensor:
            raise NotImplementedError

        @custom_ops.impl(f"{TestCustomOp.test_ns}::foo")
        def foo_impl(x):
            return x.sin()

        @custom_ops.impl_backward(f"{TestCustomOp.test_ns}::foo")
        def foo_backward(ctx, saved, grad):
            return grad * saved.cos()

        x = torch.randn([], requires_grad=True)
        op = self.get_op(f"{self.test_ns}::foo")
        with self.assertRaisesRegex(
            RuntimeError, "unable to find a 'save_for_backward'"
        ):
            y = op(x)
            y.backward()

    def test_save_for_backward_inputs_are_namedtuple(self):
        @custom_ops.custom_op(f"{TestCustomOp.test_ns}::foo")
        def foo(x: torch.Tensor) -> torch.Tensor:
            raise NotImplementedError

        @custom_ops.impl(f"{TestCustomOp.test_ns}::foo")
        def foo_impl(x):
            return x.sin()

        hit = 0

        @custom_ops.impl_save_for_backward(f"{TestCustomOp.test_ns}::foo")
        def foo_save_for_backward(inputs, output):
            nonlocal hit
            hit += 1
            self.assertTrue(isinstance(inputs, tuple))
            self.assertEqual(list(inputs._asdict().keys()), ["x"])
            return inputs.x

        @custom_ops.impl_backward(f"{TestCustomOp.test_ns}::foo")
        def foo_backward(ctx, saved, grad):
            return {"x": grad * saved.cos()}

        x = torch.randn([], requires_grad=True)
        op = self.get_op(f"{self.test_ns}::foo")
        y = op(x)
        self.assertEqual(hit, 1)
        y.backward()
        self.assertEqual(hit, 1)

    def test_backward_returns_dict(self):
        @custom_ops.custom_op(f"{TestCustomOp.test_ns}::foo")
        def foo(x: torch.Tensor) -> torch.Tensor:
            raise NotImplementedError

        @custom_ops.impl(f"{TestCustomOp.test_ns}::foo")
        def foo_impl(x):
            return x.sin()

        @custom_ops.impl_save_for_backward(f"{TestCustomOp.test_ns}::foo")
        def foo_save_for_backward(inputs, output):
            return inputs.x

        @custom_ops.impl_backward(f"{TestCustomOp.test_ns}::foo")
        def foo_backward(ctx, saved, grad):
            return grad * saved.cos()

        x = torch.randn([], requires_grad=True)
        op = self.get_op(f"{self.test_ns}::foo")
        y = op(x)
        with self.assertRaisesRegex(RuntimeError, "to be a dict"):
            y.backward()

    def test_backward_dict_invalid_keys(self):
        @custom_ops.custom_op(f"{TestCustomOp.test_ns}::foo")
        def foo(x: torch.Tensor) -> torch.Tensor:
            raise NotImplementedError

        @custom_ops.impl(f"{TestCustomOp.test_ns}::foo")
        def foo_impl(x):
            return x.sin()

        @custom_ops.impl_save_for_backward(f"{TestCustomOp.test_ns}::foo")
        def foo_save_for_backward(inputs, output):
            return inputs.x

        @custom_ops.impl_backward(f"{TestCustomOp.test_ns}::foo")
        def foo_backward(ctx, saved, grad):
            return {"x": grad * saved.cos(), "y": None}

        x = torch.randn([], requires_grad=True)
        op = self.get_op(f"{self.test_ns}::foo")
        y = op(x)
        with self.assertRaisesRegex(RuntimeError, "to have keys {'x'}"):
            y.backward()

    def test_backward_dict_grad_for_nontensor(self):
        @custom_ops.custom_op(f"{TestCustomOp.test_ns}::foo")
        def foo(x: torch.Tensor, dim: int) -> torch.Tensor:
            raise NotImplementedError

        @custom_ops.impl(f"{TestCustomOp.test_ns}::foo")
        def foo_impl(x, dim):
            return x.sin()

        @custom_ops.impl_save_for_backward(f"{TestCustomOp.test_ns}::foo")
        def foo_save_for_backward(inputs, output):
            return inputs.x

        @custom_ops.impl_backward(f"{TestCustomOp.test_ns}::foo")
        def foo_backward(ctx, saved, grad):
            return {"x": grad * saved.cos(), "dim": None}

        x = torch.randn([], requires_grad=True)
        op = self.get_op(f"{self.test_ns}::foo")
        y = op(x, 32)
        with self.assertRaisesRegex(RuntimeError, "non-Tensor-like types"):
            y.backward()

    def test_backward_dict_requires_keys_for_input_tensors(self):
        @custom_ops.custom_op(f"{TestCustomOp.test_ns}::foo")
        def foo(x: torch.Tensor, y: torch.Tensor) -> torch.Tensor:
            raise NotImplementedError

        @custom_ops.impl(f"{TestCustomOp.test_ns}::foo")
        def foo_impl(x, y):
            return x.sin()

        @custom_ops.impl_save_for_backward(f"{TestCustomOp.test_ns}::foo")
        def foo_save_for_backward(inputs, output):
            return inputs.x

        @custom_ops.impl_backward(f"{TestCustomOp.test_ns}::foo")
        def foo_backward(ctx, saved, grad):
            return {"x": grad * saved.cos()}

        x = torch.randn([], requires_grad=True)
        op = self.get_op(f"{self.test_ns}::foo")
        y = op(x, x)
        with self.assertRaisesRegex(RuntimeError, r"to have keys {.*'y'.*}"):
            y.backward()

    def test_backward_dict_requires_keys_for_input_optional_tensors(self):
        @custom_ops.custom_op(f"{TestCustomOp.test_ns}::foo")
        def foo(x: torch.Tensor, y: Optional[torch.Tensor]) -> torch.Tensor:
            raise NotImplementedError

        @custom_ops.impl(f"{TestCustomOp.test_ns}::foo")
        def foo_impl(x, y):
            return x.sin()

        @custom_ops.impl_save_for_backward(f"{TestCustomOp.test_ns}::foo")
        def foo_save_for_backward(inputs, output):
            return inputs.x

        @custom_ops.impl_backward(f"{TestCustomOp.test_ns}::foo")
        def foo_backward(ctx, saved, grad):
            return {"x": grad * saved.cos()}

        x = torch.randn([], requires_grad=True)
        op = self.get_op(f"{self.test_ns}::foo")
        y = op(x, None)
        with self.assertRaisesRegex(RuntimeError, r"to have keys {.*'y'.*}"):
            y.backward()

    def test_backward_grads_are_tensor_or_none(self):
        @custom_ops.custom_op(f"{TestCustomOp.test_ns}::foo")
        def foo(x: torch.Tensor) -> torch.Tensor:
            raise NotImplementedError

        @custom_ops.impl(f"{TestCustomOp.test_ns}::foo")
        def foo_impl(x):
            return x.sin()

        @custom_ops.impl_save_for_backward(f"{TestCustomOp.test_ns}::foo")
        def foo_save_for_backward(inputs, output):
            return inputs.x

        @custom_ops.impl_backward(f"{TestCustomOp.test_ns}::foo")
        def foo_backward(ctx, saved, grad):
            return {"x": (grad * saved.cos(),)}

        x = torch.randn([], requires_grad=True)
        op = self.get_op(f"{self.test_ns}::foo")
        y = op(x)
        with self.assertRaisesRegex(RuntimeError, "either None or a Tensor"):
            y.backward()

    def test_backward_tensorlist_input_requires_list_grads_with_same_numel(self):
        @custom_ops.custom_op(f"{TestCustomOp.test_ns}::foo")
        def foo(xs: Sequence[torch.Tensor]) -> torch.Tensor:
            raise NotImplementedError

        @custom_ops.impl(f"{TestCustomOp.test_ns}::foo")
        def foo_impl(xs):
            return xs[0].sin()

        @custom_ops.impl_save_for_backward(f"{TestCustomOp.test_ns}::foo")
        def foo_save_for_backward(inputs, output):
            return inputs.xs[0]

        @custom_ops.impl_backward(f"{TestCustomOp.test_ns}::foo")
        def foo_backward(ctx, saved, grad):
            return {"xs": [grad * saved.cos(), None]}

        xs = [torch.randn([], requires_grad=True) for _ in range(3)]
        op = self.get_op(f"{self.test_ns}::foo")
        y = op(xs)
        with self.assertRaisesRegex(RuntimeError, "3 gradients but got 2"):
            y.backward()

    def test_backward_tensorlist_input_requires_list_grads_none_or_Tensor(self):
        @custom_ops.custom_op(f"{TestCustomOp.test_ns}::foo")
        def foo(xs: Sequence[torch.Tensor]) -> torch.Tensor:
            raise NotImplementedError

        @custom_ops.impl(f"{TestCustomOp.test_ns}::foo")
        def foo_impl(xs):
            return xs[0].sin()

        @custom_ops.impl_save_for_backward(f"{TestCustomOp.test_ns}::foo")
        def foo_save_for_backward(inputs, output):
            return inputs.xs[0]

        @custom_ops.impl_backward(f"{TestCustomOp.test_ns}::foo")
        def foo_backward(ctx, saved, grad):
            return {"xs": [grad * saved.cos(), None, (None,)]}

        xs = [torch.randn([], requires_grad=True) for _ in range(3)]
        op = self.get_op(f"{self.test_ns}::foo")
        y = op(xs)
        with self.assertRaisesRegex(RuntimeError, "None or Tensor"):
            y.backward()

    def test_backward_tensorlist_input_requires_list_grads(self):
        @custom_ops.custom_op(f"{TestCustomOp.test_ns}::foo")
        def foo(xs: Sequence[torch.Tensor]) -> torch.Tensor:
            raise NotImplementedError

        @custom_ops.impl(f"{TestCustomOp.test_ns}::foo")
        def foo_impl(xs):
            return xs[0].sin()

        @custom_ops.impl_save_for_backward(f"{TestCustomOp.test_ns}::foo")
        def foo_save_for_backward(inputs, output):
            return inputs.xs[0]

        @custom_ops.impl_backward(f"{TestCustomOp.test_ns}::foo")
        def foo_backward(ctx, saved, grad):
            return {"xs": None}

        xs = [torch.randn([], requires_grad=True) for _ in range(3)]
        op = self.get_op(f"{self.test_ns}::foo")
        y = op(xs)
        with self.assertRaisesRegex(RuntimeError, "list of gradients"):
            y.backward()

    def test_backward_output_differentiability_type(self):
        @custom_ops.custom_op(f"{TestCustomOp.test_ns}::foo")
        def foo(xs: Sequence[torch.Tensor]) -> torch.Tensor:
            raise NotImplementedError

        with self.assertRaisesRegex(RuntimeError, "output_differentiability"):

            @custom_ops.impl_backward(
                f"{TestCustomOp.test_ns}::foo", output_differentiability=True
            )
            def foo_backward(ctx, saved, grad):
                return {"xs": None}

    def test_backward_output_differentiability_numel(self):
        @custom_ops.custom_op(f"{TestCustomOp.test_ns}::foo")
        def foo(xs: Sequence[torch.Tensor]) -> Tuple[torch.Tensor, torch.Tensor]:
            raise NotImplementedError

        with self.assertRaisesRegex(RuntimeError, "output_differentiability"):

            @custom_ops.impl_backward(
                f"{TestCustomOp.test_ns}::foo", output_differentiability=[True]
            )
            def foo_backward(ctx, saved, grad):
                return {"xs": None}

    def test_backward_output_differentiability_tensorlist(self):
        @custom_ops.custom_op(f"{self.test_ns}::foo")
        def foo(x: Tensor) -> Tuple[List[Tensor], Tensor]:
            raise NotImplementedError

        @custom_ops.impl(f"{self.test_ns}::foo")
        def foo_impl(x):
            return [x.clone(), x.clone()], x.clone()

        @custom_ops.impl_save_for_backward(f"{TestCustomOp.test_ns}::foo")
        def foo_save_for_backward(inputs, output):
            return []

        @custom_ops.impl_backward(
            f"{TestCustomOp.test_ns}::foo", output_differentiability=[False, True]
        )
        def foo_backward(ctx, saved, grad_lst, grad):
            return {"x": grad}

        op = self.get_op(f"{self.test_ns}::foo")
        x = torch.randn(3, requires_grad=True)
        [a, b], c = op(x)
        self.assertFalse(a.requires_grad)
        self.assertFalse(b.requires_grad)
        self.assertTrue(c.requires_grad)

    def test_backward_output_differentiability_non_tensor(self):
        @custom_ops.custom_op(f"{self.test_ns}::foo")
        def foo(x: Tensor) -> Tuple[Tensor, int]:
            raise NotImplementedError

        @custom_ops.impl(f"{self.test_ns}::foo")
        def foo_impl(x):
            return x.clone(), 3

        @custom_ops.impl_save_for_backward(f"{TestCustomOp.test_ns}::foo")
        def foo_save_for_backward(inputs, output):
            return []

        @custom_ops.impl_backward(
            f"{TestCustomOp.test_ns}::foo", output_differentiability=[True, True]
        )
        def foo_backward(ctx, saved, grad0, grad1):
            return {"x": grad0}

        op = self.get_op(f"{self.test_ns}::foo")
        x = torch.randn(3, requires_grad=True)
        with self.assertRaisesRegex(RuntimeError, "is not a Tensor"):
            op(x)

    @unittest.skipIf(not TEST_CUDA, "requires CUDA")
    def test_impl_separate(self):
        @custom_ops.custom_op(f"{TestCustomOp.test_ns}::foo")
        def foo(x: torch.Tensor) -> torch.Tensor:
            raise NotImplementedError

        @custom_ops.impl(f"{TestCustomOp.test_ns}::foo", device_types="cpu")
        def foo_cpu(x):
            return x.sin()

        @custom_ops.impl(f"{TestCustomOp.test_ns}::foo", device_types="cuda")
        def foo_cuda(x):
            return x.cos()

        x = torch.randn(3)
        op = self.get_op(f"{self.test_ns}::foo")
        result = op(x)
        self.assertEqual(result, foo_cpu(x))

        x_cuda = x.cuda()
        op = self.get_op(f"{self.test_ns}::foo")
        result = op(x_cuda)
        self.assertEqual(result, foo_cuda(x_cuda))

    @unittest.skipIf(not TEST_CUDA, "requires CUDA")
    def test_impl_multiple(self):
        @custom_ops.custom_op(f"{TestCustomOp.test_ns}::foo")
        def foo(x: torch.Tensor) -> torch.Tensor:
            raise NotImplementedError

        @custom_ops.impl(f"{TestCustomOp.test_ns}::foo")
        def foo_impl(x):
            return x.cos()

        op = self.get_op(f"{self.test_ns}::foo")
        x = torch.randn(3)
        result = op(x)
        self.assertEqual(result, foo_impl(x))

        x_cuda = x.cuda()
        result = op(x_cuda)
        self.assertEqual(result, foo_impl(x_cuda))

    def test_impl_abstract_overload(self):
        lib = self.lib()
        lib.define("sin.blah(Tensor x) -> Tensor")

        torch.library.register_fake(
            f"{self.test_ns}::sin.blah", torch.empty_like, lib=lib
        )

        op = self.ns().sin.blah
        x = torch.randn(3, device="meta")
        op(x)

    def test_impl_meta(self):
        @custom_ops.custom_op(f"{TestCustomOp.test_ns}::foo")
        def foo(x: torch.Tensor, dim: int) -> torch.Tensor:
            raise NotImplementedError

        @torch.library.register_fake(f"{TestCustomOp.test_ns}::foo", lib=self.lib())
        def foo_meta(x, dim):
            output_shape = list(x.shape)
            del output_shape[dim]
            return x.new_empty(output_shape)

        x = torch.randn(2, 3, device="meta")
        op = self.get_op(f"{self.test_ns}::foo")
        result = op(x, 1)
        self.assertEqual(result.shape, foo_meta(x, 1).shape)

    def test_duplicate_impl(self):
        @custom_ops.custom_op(f"{TestCustomOp.test_ns}::foo")
        def foo(x: torch.Tensor, dim: int) -> torch.Tensor:
            raise NotImplementedError

        @torch.library.register_fake(f"{TestCustomOp.test_ns}::foo", lib=self.lib())
        def foo_meta(x, dim):
            output_shape = list(x.shape)
            del output_shape[dim]
            return x.new_empty(output_shape)

        with self.assertRaisesRegex(RuntimeError, r"test_custom_ops.py:\d+"):

            @torch.library.register_fake(f"{TestCustomOp.test_ns}::foo", lib=self.lib())
            def foo_meta2(x, dim):
                output_shape = list(x.shape)
                del output_shape[dim]
                return x.new_empty(output_shape)

    def test_new_data_dependent_symint(self):
        @custom_ops.custom_op(f"{TestCustomOp.test_ns}::foo")
        def foo(x: torch.Tensor) -> torch.Tensor:
            raise NotImplementedError

        @torch.library.register_fake(f"{TestCustomOp.test_ns}::foo", lib=self.lib())
        def foo_meta(x):
            ctx = torch.library.get_ctx()
            r = ctx.new_dynamic_size(min=1)
            with self.assertRaisesRegex(ValueError, "greater than or equal to 0"):
                ctx.new_dynamic_size(min=-1)
            with self.assertRaisesRegex(ValueError, "SymInt"):
                ctx.new_dynamic_size(max=x.numel())
            # NB: You must return dynamic sizes!
            return x.new_empty(r)

        x = torch.randn(2, 3, device="cpu")
        op = self.get_op(f"{self.test_ns}::foo")
        make_fx(op, tracing_mode="symbolic")(x)

    def test_meta_for_data_dependent_shape_operation(self):
        x = torch.randn(10, device="meta")
        with self.assertRaisesRegex(RuntimeError, "data-dependent shape"):
            numpy_nonzero(x)

    def test_basic_make_fx(self):
        # More serious tests are in our CustomOp opinfo db,
        # this one is just a sanity check.
        @custom_ops.custom_op(f"{TestCustomOp.test_ns}::foo")
        def foo(x: torch.Tensor) -> torch.Tensor:
            raise NotImplementedError

        @torch.library.register_fake(f"{TestCustomOp.test_ns}::foo", lib=self.lib())
        def foo_meta(x):
            return x.sum()

        x = torch.randn(3)
        op = self.get_op(f"{self.test_ns}::foo")
        gm = make_fx(op, tracing_mode="symbolic")(x)
        self.assertTrue(f"{TestCustomOp.test_ns}.foo" in gm.code)

    def test_not_implemented_error(self):
        @custom_ops.custom_op(f"{TestCustomOp.test_ns}::foo")
        def foo(x: torch.Tensor) -> torch.Tensor:
            raise NotImplementedError

        x = torch.randn(3)
        op = self.get_op(f"{self.test_ns}::foo")
        with self.assertRaisesRegex(NotImplementedError, "cpu impl registered"):
            op(x)

        x = torch.randn(3, device="meta")
        with self.assertRaisesRegex(NotImplementedError, "no fake impl or Meta kernel"):
            op(x)

        @custom_ops.custom_op(f"{TestCustomOp.test_ns}::bar")
        def bar(sizes: Sequence[int]) -> torch.Tensor:
            raise NotImplementedError

        op = self.get_op(f"{self.test_ns}::bar")
        with self.assertRaisesRegex(NotImplementedError, "no Tensor inputs"):
            op((1, 2, 3))

    def test_data_dependent_basic(self):
        x = torch.randn(5, 5)
        gm = make_fx(numpy_nonzero, tracing_mode="symbolic")(x)
        self.assertTrue("nonzero" in gm.code)

    def test_data_dependent_fake_tracing(self):
        x = torch.randn(5, 5)
        # We've updated to attempt to use unbacked symints even for fake
        # tracing
        make_fx(numpy_nonzero, tracing_mode="fake")(x)

    def test_symints(self):
        def f(x):
            return torch.ops._torch_testing.numpy_view_copy(x, x.shape)

        x = torch.randn(2, 3, 4)
        gm = make_fx(f, tracing_mode="symbolic")(x)
        result = gm(x)
        self.assertEqual(result, f(x))
        self.assertExpectedInline(
            gm.code.strip(),
            """\
def forward(self, x_1):
    sym_size_int = torch.ops.aten.sym_size.int(x_1, 0)
    sym_size_int_1 = torch.ops.aten.sym_size.int(x_1, 1)
    sym_size_int_2 = torch.ops.aten.sym_size.int(x_1, 2)
    numpy_view_copy = torch.ops._torch_testing.numpy_view_copy.default(x_1, [sym_size_int, sym_size_int_1, sym_size_int_2]);  x_1 = sym_size_int = sym_size_int_1 = sym_size_int_2 = None
    return numpy_view_copy""",  # noqa: B950
        )

    @unittest.skipIf(IS_WINDOWS, "torch.compile doesn't work on windows")
    def test_data_dependent_compile(self):
        import torch._dynamo.testing
        from torch._dynamo.utils import counters

        counters.clear()
        cnt = torch._dynamo.testing.CompileCounter()

        @torch.compile(backend=cnt)
        def f(x):
            return numpy_nonzero(x.clone()).clone()

        f(torch.randn(10))

        self.assertEqual(len(counters["graph_break"]), 1)
        self.assertEqual(next(iter(counters["graph_break"].values())), 1)
        self.assertExpectedInline(
            next(iter(counters["graph_break"].keys())).replace(";", "\n"),
            """\
Dynamic shape operator
  Explanation: Operator `_torch_testing.numpy_nonzero.default`'s output shape depends on input Tensor data.
  Hint: Enable tracing of dynamic shape operators with `torch._dynamo.config.capture_dynamic_output_shape_ops = True`

  Developer debug context: _torch_testing.numpy_nonzero.default

 For more details about this graph break, please visit: https://meta-pytorch.github.io/compile-graph-break-site/gb/gb0036.html""",
        )

    # pre-existing problem: torch.compile(dynamic=True) will, by default,
    # graph break on data-dependent operations. Eventually we'll make it so
    # that it never graph breaks on data-dependent operations.
    @unittest.expectedFailure
    def test_data_dependent_nms_dynamic_compile(self):
        import torch._dynamo.testing
        from torch._dynamo.utils import counters

        counters.clear()
        cnt = torch._dynamo.testing.CompileCounter()

        @torch.compile(backend=cnt, dynamic=True)
        def f(x, s, i):
            return torch.ops._torch_testing.numpy_nms(x.clone(), s, i).clone()

        f(torch.randn(20, 4), torch.randn(20), 0.1)

        self.assertEqual(len(counters["graph_break"]), 0)

    def test_impl_on_existing_op(self):
        lib = self.lib()
        lib.define("foo(Tensor x) -> Tensor")
        qualname = f"{self.test_ns}::foo"

        @torch._custom_ops.impl(qualname)
        def foo_impl(x):
            return x.sin()

        op = self.get_op(qualname)
        x = torch.randn(3)
        result = op(x)
        self.assertEqual(result, x.sin())

    @parametrize(
        "key", ["CPU", "CUDA", "CompositeImplicitAutograd", "CompositeExplicitAutograd"]
    )
    def test_impl_on_existing_op_with_cpu_registration(self, key):
        lib = self.lib()
        lib.define("foo(Tensor x) -> Tensor")
        qualname = f"{self.test_ns}::foo"

        def foo_impl(x):
            return x.sin()

        lib.impl("foo", foo_impl, key)
        op = self.get_op(qualname)

        with self.assertRaisesRegex(RuntimeError, "already has an implementation"):
            custom_ops.impl(qualname, func=foo_impl)

    def test_abstract_impl_on_existing_op(self):
        lib = self.lib()
        lib.define("foo(Tensor x) -> Tensor")
        qualname = f"{self.test_ns}::foo"

        @torch.library.register_fake(qualname, lib=self.lib())
        def foo_impl(x):
            return x.sin()

        op = self.get_op(qualname)
        with torch._subclasses.FakeTensorMode():
            x = torch.randn(3)
            result = op(x)
            self.assertEqual(result.shape, x.shape)
            self.assertEqual(result.stride(), x.stride())

    def test_abstract_impl_on_existing_op_with_meta(self):
        lib = self.lib()
        lib.define("foo(Tensor x) -> Tensor")
        qualname = f"{self.test_ns}::foo"

        def foo_impl(x):
            return x.sin()

        lib.impl("foo", foo_impl, "Meta")
        op = self.get_op(qualname)

        with self.assertRaisesRegex(RuntimeError, r"already has .*Meta implementation"):
            torch.library.register_fake(qualname, foo_impl, lib=self.lib())

    def test_abstract_impl_on_existing_op_with_CompositeImplicitAutograd(self):
        lib = self.lib()
        lib.define("foo(Tensor x) -> Tensor")
        qualname = f"{self.test_ns}::foo"

        def foo_impl(x):
            return x.sin()

        lib.impl("foo", foo_impl, "CompositeImplicitAutograd")
        op = self.get_op(qualname)

        with self.assertRaisesRegex(RuntimeError, "CompositeImplicitAutograd"):
            torch.library.register_fake(qualname, foo_impl, lib=self.lib())

    def test_abstract_impl_on_existing_op_with_CompositeExplicitAutograd(self):
        lib = self.lib()
        lib.define("foo(Tensor x) -> Tensor")
        qualname = f"{self.test_ns}::foo"

        def foo_impl(x):
            return x.sin()

        lib.impl("foo", foo_impl, "CompositeExplicitAutograd")
        op = self.get_op(qualname)

        torch.library.register_fake(qualname, lambda x: x.sum(), lib=self.lib())
        with torch._subclasses.FakeTensorMode():
            x = torch.randn(10)
            result = op(x)
            self.assertEqual(result.shape, ())

    def _test_backward_impl_raises(self, qualname, err_regex):
        with self.assertRaisesRegex(RuntimeError, err_regex):

            @custom_ops.impl_save_for_backward(qualname)
            def foo2(x):
                return

        with self.assertRaisesRegex(RuntimeError, err_regex):

            @custom_ops.impl_backward(qualname)
            def foo3(x):
                return

    def test_backward_impl_on_existing_op_incorrect_schema_views(self):
        lib = self.lib()
        lib.define("foo(Tensor(a) x) -> Tensor(a)")
        qualname = f"{self.test_ns}::foo"
        self._test_backward_impl_raises(qualname, "operator that returns views")

    def test_backward_impl_on_existing_op_incorrect_schema_mutable(self):
        lib = self.lib()
        lib.define("foo(Tensor(a!) x) -> Tensor")
        qualname = f"{self.test_ns}::foo"
        self._test_backward_impl_raises(qualname, "non-functional")

    def test_backward_impl_on_existing_op_incorrect_schema_no_output(self):
        lib = self.lib()
        lib.define("foo(Tensor x) -> ()")
        qualname = f"{self.test_ns}::foo"
        self._test_backward_impl_raises(qualname, "no returns")

    def test_backward_impl_on_existing_op_CompositeImplicitAutograd(self):
        lib = self.lib()
        lib.define("foo(Tensor x) -> Tensor")
        qualname = f"{self.test_ns}::foo"
        lib.impl("foo", lambda x: x.sin().cos(), "CompositeImplicitAutograd")
        self._test_backward_impl_raises(qualname, "CompositeImplicitAutograd")

    @parametrize("key", ["Autograd", "AutogradCPU", "AutogradCUDA"])
    def test_backward_impl_on_existing_op_with_key(self, key):
        lib = self.lib()
        lib.define("foo(Tensor x) -> Tensor")
        qualname = f"{self.test_ns}::foo"
        lib.impl("foo", lambda x: x.sin().cos(), key)
        self._test_backward_impl_raises(qualname, key)

    def test_is_functional_schema(self):
        tests = {
            "foo(Tensor x) -> Tensor": True,
            "foo(Tensor(a) x) -> Tensor": True,
            "foo(Tensor(a!) x) -> Tensor": False,
            "foo(Tensor(a) x) -> Tensor(a)": False,
            "foo(Tensor x) -> ()": False,
        }
        for schema_str, expected in tests.items():
            res = torch._library.utils.is_functional_schema(schema_str)
            self.assertEqual(res, expected)

            from torchgen.model import FunctionSchema

            schema = FunctionSchema.parse(schema_str)
            res = torch._library.utils.is_functional_schema(schema)
            self.assertEqual(res, expected)

            schema = torch._C.parse_schema(schema_str)
            res = torch._library.utils.is_functional_schema(schema)
            self.assertEqual(res, expected)

    def test_incorrect_schema_types(self):
        with torch.library._scoped_library("mylib", "FRAGMENT") as lib:
            with self.assertRaisesRegex(RuntimeError, "unknown type specifier"):
                lib.define("foo12(Tensor a) -> asdfasdf")
            with self.assertRaisesRegex(RuntimeError, "unknown type specifier"):
                lib.define("foo12(asdf a) -> Tensor")
            with self.assertRaisesRegex(RuntimeError, "Use `SymInt` or `int`"):
                lib.define("foo12(int64_t a) -> Tensor")
            with self.assertRaisesRegex(RuntimeError, "Use `float`"):
                lib.define("foo12(double a) -> Tensor")

    def test_is_tensorlist_like_type(self):
        tensorlists = [
            # Tensor[]
            torch.ops.aten.where.default._schema.returns[0].type,
            # Tensor?[]
            torch.ops.aten.index.Tensor._schema.arguments[1].type,
            # Tensor[]?
            torch._C.parse_schema("foo(Tensor[]? x) -> ()").arguments[0].type,
            # Tensor?[]?
            torch._C.parse_schema("foo(Tensor?[]? x) -> ()").arguments[0].type,
        ]
        non_tensorlists = [
            # Tensor
            torch.ops.aten.sin.default._schema.arguments[0].type,
            # IntList
            torch.ops.aten.sum.dim_IntList._schema.arguments[1].type,
        ]
        for a in tensorlists:
            self.assertTrue(torch._library.utils.is_tensorlist_like_type(a))
        for a in non_tensorlists:
            self.assertFalse(torch._library.utils.is_tensorlist_like_type(a))

    def test_backward_impl_on_existing_op(self):
        lib = self.lib()
        lib.define("foo(Tensor x) -> Tensor")
        qualname = f"{self.test_ns}::foo"

        @custom_ops.impl(qualname)
        def foo_impl(x):
            with torch.no_grad():
                return x.sin()

        @custom_ops.impl_save_for_backward(qualname)
        def foo_save_for_backward(inputs, output):
            return inputs.x

        @custom_ops.impl_backward(qualname)
        def foo_backward(ctx, saved, grad_out):
            return {"x": grad_out * saved.cos()}

        op = self.get_op(qualname)
        x = torch.randn([], requires_grad=True)
        y = op(x)
        (gx,) = torch.autograd.grad(y, x)
        self.assertEqual(gx, x.cos())

    @parametrize(
        "tags",
        [
            subtest(torch.Tag.pointwise, "single"),
            subtest((torch.Tag.pointwise,), "tuple"),
            subtest([torch.Tag.pointwise], "list"),
        ],
    )
    def test_define_with_tags(self, tags):
        lib = self.lib()
        tags = (torch.Tag.pointwise,)
        torch.library.define(
            f"{self.test_ns}::foo", "(Tensor x) -> Tensor", lib=lib, tags=tags
        )
        actual = self.ns().foo.default.tags
        self.assertTrue(isinstance(actual, list))
        self.assertEqual(actual, list(tags))

    def test_builtin_aten_ops_are_pt2_compliant(self):
        for op in [torch.ops.aten.sin.default, torch.ops.aten.sum.dim_IntList]:
            self.assertIn(torch.Tag.pt2_compliant_tag, op.tags)

    def test_builtin_torchscript_ops(self):
        for op in [torch.ops.aten.sub.complex, torch.ops.aten.mul.complex]:
            self.assertIn(torch.Tag.pt2_compliant_tag, op.tags)

    def test_autogen_aten_ops_are_pt2_compliant(self):
        for op in [torch.ops.aten.fill.Tensor_out]:
            self.assertIn(torch.Tag.generated, op.tags)
            self.assertIn(torch.Tag.pt2_compliant_tag, op.tags)

    def test_resolve_packet(self):
        x = torch.randn(3)
        result = torch._C._jit_resolve_packet("aten::sum", x)
        self.assertEqual(result, "default")

        result = torch._C._jit_resolve_packet("aten::sum", x, dim=1)
        self.assertEqual(result, "dim_IntList")

        with self.assertRaisesRegex(RuntimeError, "failed to match any schema"):
            result = torch._C._jit_resolve_packet("aten::sum", x, x, x)

    def test_define_bad_schema(self):
        lib = self.lib()
        with self.assertRaisesRegex(ValueError, "expected schema to look like"):
            torch.library.define(f"{self.test_ns}::foo", "foo(Tensor x) -> Tensor")

    def test_define_and_impl(self):
        lib = self.lib()
        torch.library.define(f"{self.test_ns}::foo", "(Tensor x) -> Tensor", lib=lib)

        @torch.library.impl(f"{self.test_ns}::foo", "CPU", lib=lib)
        def f(x):
            return torch.from_numpy(np.sin(x.numpy()))

        x = torch.randn(3)
        y = self.ns().foo(x)
        assert torch.allclose(y, x.sin())

    def test_define_validation(self):
        with self.assertRaisesRegex(ValueError, "namespace"):
            torch.library.define("foo", "(Tensor x) -> Tensor")

    def test_legacy_define(self):
        lib = self.lib()

        @torch.library.define(lib, "foo(Tensor x) -> Tensor")
        def f(x):
            return torch.from_numpy(np.sin(x.numpy()))

        x = torch.randn(3)
        y = self.ns().foo(x)
        assert torch.allclose(y, x.sin())

    def test_impl_function(self):
        lib = self.lib()
        torch.library.define(f"{self.test_ns}::foo", "(Tensor x) -> Tensor", lib=lib)

        def f(x):
            return torch.from_numpy(np.sin(x.numpy()))

        torch.library.impl(f"{self.test_ns}::foo", "CPU", f, lib=lib)
        x = torch.randn(3)
        y = self.ns().foo(x)
        assert torch.allclose(y, x.sin())

    def test_legacy_impl(self):
        lib = self.lib()
        torch.library.define(f"{self.test_ns}::foo", "(Tensor x) -> Tensor", lib=lib)

        @torch.library.impl(lib, "foo", "CPU")
        def f(x):
            return torch.from_numpy(np.sin(x.numpy()))

        x = torch.randn(3)
        y = self.ns().foo(x)
        assert torch.allclose(y, x.sin())

    def test_defined_in_python(self):
        self.assertFalse(torch.ops.aten.sin.default._defined_in_python)
        self.assertFalse(torch.ops.aten.sum.dim_IntList._defined_in_python)

        lib = self.lib()
        torch.library.define("{self._test_ns}::foo", "(Tensor x) -> Tensor", lib=lib)
        ns = self.ns()
        self.assertTrue(ns.foo.default._defined_in_python)

        torch.library.define(
            "{self._test_ns}::bar.overload", "(Tensor x) -> Tensor", lib=lib
        )
        self.assertTrue(ns.bar.overload._defined_in_python)

    def _test_impl_device(self, name, types, device):
        lib = self.lib()
        torch.library.define(f"{self.test_ns}::{name}", "(Tensor x) -> Tensor", lib=lib)

        @torch.library.impl(f"{self.test_ns}::{name}", types)
        def f(x):
            x_np = x.cpu().numpy()
            y = torch.from_numpy(np.sin(x_np))
            return y.to(device=x.device)

        x = torch.randn(3, device=device)
        y = getattr(self.ns(), name)(x)
        assert torch.allclose(y, x.sin())

    def test_impl_device_cpu(self):
        self._test_impl_device("foo1", "default", "cpu")
        self._test_impl_device("foo2", ["cpu"], "cpu")
        self._test_impl_device("foo3", ["cpu", "cuda"], "cpu")

    @unittest.skipIf(not TEST_CUDA, "requires cuda")
    def test_impl_device_cuda(self):
        self._test_impl_device("foo4", "default", "cuda")
        self._test_impl_device("foo5", ["cuda"], "cuda")
        self._test_impl_device("foo6", ["cpu", "cuda"], "cuda")

    def test_impl_device_function(self):
        lib = self.lib()
        torch.library.define(f"{self.test_ns}::foo", "(Tensor x) -> Tensor", lib=lib)

        def f(x):
            x_np = x.cpu().numpy()
            y = torch.from_numpy(np.sin(x_np))
            return y.to(device=x.device)

        torch.library.impl(f"{self.test_ns}::foo", "default", f, lib=lib)
        x = torch.randn(3)
        y = self.ns().foo(x)
        assert torch.allclose(y, x.sin())

    def test_impl_device_invalid(self):
        with self.assertRaisesRegex(RuntimeError, "Expected one of cpu, cuda"):
            torch.library.impl("blah::blah", "somethingsomething")

    def test_override_impl(self):
        lib = self.lib()
        op_name = f"{self.test_ns}::foo"
        torch.library.define(op_name, "(Tensor x) -> Tensor", lib=lib)
        op = self.ns().foo.default

        def foo_impl1(x):
            return x * 1

        # Register cpu impl to foo_impl1
        lib.impl("foo", foo_impl1, "CPU")
        self.assertEqual(op(torch.ones(3)), torch.ones(3))

        def foo_impl2(x):
            return torch.cat([x, x])

        with self.assertRaisesRegex(RuntimeError, "already a kernel registered"):
            lib.impl("foo", foo_impl2, "CPU")

        # Override cpu impl to foo_impl2
        lib.impl(op_name, foo_impl2, "CPU", allow_override=True)
        self.assertEqual(op(torch.ones(3)), torch.ones(6))

    def test_override_fake(self):
        lib = self.lib()
        op_name = f"{self.test_ns}::foo"
        torch.library.define(op_name, "(Tensor x) -> Tensor", lib=lib)
        op = self.ns().foo.default

        def foo_impl1(x):
            return x * 1

        # Register fake kernel to foo_impl1
        torch.library.register_fake(op_name, foo_impl1, lib=lib)

        with torch._subclasses.FakeTensorMode():
            self.assertEqual(op(torch.ones(3)).shape, [3])
        self.assertEqual(op(torch.ones(3, device="meta")).shape, [3])

        def foo_impl2(x):
            return torch.cat([x, x])

        with self.assertRaisesRegex(RuntimeError, "already has an fake impl"):
            torch.library.register_fake(op_name, foo_impl2, lib=lib)

        # Override fake kernel to foo_impl2
        torch.library.register_fake(op_name, foo_impl2, lib=lib, allow_override=True)
        with torch._subclasses.FakeTensorMode():
            self.assertEqual(op(torch.ones(3)).shape, [6])
        self.assertEqual(op(torch.ones(3, device="meta")).shape, [6])

        # Use scoped_library to temporarily register Fake kernel to foo_impl1
        with torch.library._scoped_library(self.test_ns, "FRAGMENT") as lib2:
            torch.library.register_fake(
                op_name, foo_impl1, lib=lib2, allow_override=True
            )
            with torch._subclasses.FakeTensorMode():
                self.assertEqual(op(torch.ones(3)).shape, [3])
            self.assertEqual(op(torch.ones(3, device="meta")).shape, [3])

        # Fake kernel should go back to foo_impl2
        with torch._subclasses.FakeTensorMode():
            self.assertEqual(op(torch.ones(3)).shape, [6])
        self.assertEqual(op(torch.ones(3, device="meta")).shape, [6])

    def test_override_meta(self):
        lib = self.lib()
        op_name = f"{self.test_ns}::foo"
        torch.library.define(op_name, "(Tensor x) -> Tensor", lib=lib)
        op = self.ns().foo.default

        def foo_impl1(x):
            return x * 1

        # Register Meta kernel to foo_impl1
        lib.impl("foo", foo_impl1, "Meta")
        self.assertEqual(op(torch.ones(3, device="meta")).shape, [3])

        def foo_impl2(x):
            return torch.cat([x, x])

        with self.assertRaisesRegex(RuntimeError, "already a kernel registered"):
            lib.impl("foo", foo_impl2, "Meta")

        # Override Meta kernel to foo_impl2
        lib.impl("foo", foo_impl2, "Meta", allow_override=True)
        self.assertEqual(op(torch.ones(3, device="meta")).shape, [6])

        # Use scoped_library to temporarily register Meta kernel to foo_impl1
        with torch.library._scoped_library(self.test_ns, "FRAGMENT") as lib2:
            lib2.impl("foo", foo_impl1, "Meta", allow_override=True)
            self.assertEqual(op(torch.ones(3, device="meta")).shape, [3])

        # Meta kernel should go back to foo_impl2
        self.assertEqual(op(torch.ones(3, device="meta")).shape, [6])

        # Use register_fake to override Meta kernel to foo_impl1
        torch.library.register_fake(op_name, foo_impl1, lib=lib, allow_override=True)
        self.assertEqual(op(torch.ones(3, device="meta")).shape, [3])

    def test_override_cea(self):
        lib = self.lib()
        op_name = f"{self.test_ns}::foo"
        torch.library.define(op_name, "(Tensor x) -> Tensor", lib=lib)
        op = self.ns().foo.default

        def foo_impl1(x):
            return x * 1

        # Register CEA impl to foo_impl1
        lib.impl("foo", foo_impl1, "CompositeExplicitAutograd")
        with torch._subclasses.FakeTensorMode():
            self.assertEqual(op(torch.ones(3)).shape, [3])
        self.assertEqual(op(torch.ones(3, device="meta")).shape, [3])

        def foo_impl2(x):
            return torch.cat([x, x])

        # Override Meta/fake kernel with foo_impl2
        torch.library.register_fake(op_name, foo_impl2, lib=lib, allow_override=True)
        with torch._subclasses.FakeTensorMode():
            self.assertEqual(op(torch.ones(3)).shape, [6])
        self.assertEqual(op(torch.ones(3, device="meta")).shape, [6])

    @scoped_load_inline
    def test_autograd_function_backed_op(self, load_inline):
        cpp_source = """
struct CustomOpAutogradFunction : public torch::autograd::Function<CustomOpAutogradFunction> {
  static constexpr bool is_traceable = true;

  static torch::Tensor forward(
      torch::autograd::AutogradContext* ctx,
      const torch::Tensor& x) {
    return x;
  }

  static torch::autograd::variable_list backward(
      torch::autograd::AutogradContext *ctx,
      torch::autograd::variable_list grad_output) {
    return grad_output;
  }
};

torch::Tensor custom_op_backed_by_autograd_fn(const torch::Tensor& x) {
  return CustomOpAutogradFunction::apply(x);
}

TORCH_LIBRARY(test_autograd_function_backed_op, m) {
    m.def("custom_op_backed_by_autograd_fn", custom_op_backed_by_autograd_fn);
}
        """

        module = load_inline(
            name="test_autograd_function_backed_op",
            cpp_sources=cpp_source,
            functions="custom_op_backed_by_autograd_fn",
            verbose=True,
        )

        x = torch.ones(2, 2, requires_grad=True)
        temp = x.detach().clone()
        out = (
            torch.ops.test_autograd_function_backed_op.custom_op_backed_by_autograd_fn(
                x
            )
        )
        loss = out.sum()
        loss.backward()
        self.assertEqual(x.grad, temp)

    # Using a non-existent DSO is a quick way to trigger an OSError,
    # which can be used to not break BC.
    def test_load_library(self):
        with self.assertRaisesRegex(
            OSError, "Could not load this library: .*libnoexist.so"
        ):
            torch.ops.load_library("libnoexist.so")


def op_with_incorrect_schema(testcase, name):
    lib = testcase.lib()
    lib.define(f"{name}(Tensor x) -> Tensor")
    qualname = f"{testcase.test_ns}::{name}"
    lib.impl(name, lambda x: x[:], "CompositeExplicitAutograd")
    return testcase.get_op(qualname)


class MiniOpTest(CustomOpTestCaseBase):
    test_ns = "mini_op_test"

    def _init_op_delayed_backward_error(self):
        name = "delayed_error"
        qualname = f"{self.test_ns}::{name}"
        lib = self.lib()
        lib.define(f"{name}(Tensor x) -> Tensor")
        lib.impl(name, lambda x: x.clone(), "CompositeExplicitAutograd")
        op = self.get_op(qualname)

        class Op(torch.autograd.Function):
            @staticmethod
            def forward(ctx, x):
                with torch._C._AutoDispatchBelowAutograd():
                    return op(x)

            @staticmethod
            def backward(ctx, grad):
                raise NotImplementedError

        def autograd_impl(x):
            return Op.apply(x)

        lib.impl(name, autograd_impl, "Autograd")
        return op

    def _init_op_with_no_abstract_impl(self):
        name = "no_abstract"
        qualname = f"{self.test_ns}::{name}"
        lib = self.lib()
        lib.define(f"{name}(Tensor x) -> Tensor", tags=(torch.Tag.pt2_compliant_tag,))
        lib.impl(name, lambda x: x.clone(), "CPU")
        return torch._library.utils.lookup_op(qualname)

    def setUp(self):
        super().setUp()
        self._op_with_no_abstract_impl = self._init_op_with_no_abstract_impl()
        self._op_delayed_backward_error = self._init_op_delayed_backward_error()

    @optests.dontGenerateOpCheckTests("Testing this API")
    def test_dont_generate(self):
        op = op_with_incorrect_schema(self, "incorrect_schema")
        x = torch.randn(3)
        op(x)

    def test_mm(self):
        x = torch.randn(2, 3, requires_grad=True)
        y = torch.randn(3, 5)
        result = torch.ops.aten.mm.default(x, y)
        self.assertEqual(result, x @ y)

    def test_mm_meta(self):
        x = torch.randn(2, 3, requires_grad=True, device="meta")
        y = torch.randn(3, 5, device="meta")
        result = torch.ops.aten.mm.default(x, y)
        self.assertEqual(result.shape, (x @ y).shape)

    def test_mm_fake(self):
        with torch._subclasses.fake_tensor.FakeTensorMode():
            x = torch.randn(2, 3, requires_grad=True, device="cpu")
            y = torch.randn(3, 5, device="cpu")
            result = torch.ops.aten.mm.default(x, y)
            self.assertEqual(result.shape, (x @ y).shape)

    def test_mm_errors(self):
        x = torch.randn(2, 3, requires_grad=True)
        y = torch.randn(4, 5)
        with self.assertRaisesRegex(RuntimeError, "cannot be multiplied"):
            result = torch.ops.aten.mm.default(x, y)

    def test_nonzero(self):
        x = torch.tensor([0, 1, 2, 0, 0])
        y = torch.ops.aten.nonzero.default(x)
        self.assertEqual(y, torch.tensor([[1], [2]]))

    def test_inplace(self):
        x = torch.randn(3)
        x_clone = x.clone()
        y = torch.ops.aten.sin_(x)
        self.assertEqual(x, x_clone.sin())

    def test_incorrect_schema(self):
        op = op_with_incorrect_schema(self, "incorrect_schema")
        x = torch.randn(3)
        op(x)

    def test_no_abstract(self):
        op = self._op_with_no_abstract_impl
        x = torch.randn(3)
        op(x)

    def test_delayed_error(self):
        op = self._op_delayed_backward_error
        x = torch.randn([], requires_grad=True)
        y = op(x)
        with self.assertRaises(NotImplementedError):
            y.sum().backward()

    def test_delayed_error_no_requires_grad(self):
        op = self._op_delayed_backward_error
        x = torch.randn([])
        y = op(x)


class TestCustomOpAPI(TestCase):
    @skipIfTorchDynamo("Expected to fail due to no FakeTensor support; not a bug")
    def test_basic(self):
        @torch.library.custom_op("_torch_testing::add", mutates_args=())
        def add(x: Tensor, y: float) -> Tensor:
            x_np = x.numpy(force=True)
            out_np = x_np + y
            return torch.from_numpy(out_np).to(x.device)

        x = torch.randn(3)
        y = 3.14
        z = add(x, y)
        self.assertEqual(z, x + y)

        cpu_called = False

        @add.register_kernel("cpu")
        def _(x, y):
            nonlocal cpu_called
            cpu_called = True
            x_np = x.numpy()
            out_np = x_np + y
            return torch.from_numpy(out_np)

        z = add(x, y)
        self.assertEqual(z, x + y)
        self.assertTrue(cpu_called)

    @skipIfTorchDynamo("Expected to fail due to no FakeTensor support; not a bug")
    def test_no_grad_skips_autograd(self):
        @torch.library.custom_op("_torch_testing::add", mutates_args=())
        def add(x: Tensor, y: float) -> Tensor:
            x_np = x.numpy(force=True)
            out_np = x_np + y
            return torch.from_numpy(out_np).to(x.device)

        called = 0

        def setup_context(ctx, inputs, output):
            nonlocal called
            called += 1

        def backward(ctx, grad):
            raise AssertionError("should not be reached")

        add.register_autograd(backward, setup_context=setup_context)

        x = torch.randn(3, requires_grad=True)
        with torch.no_grad():
            y = add(x, 2.0)
        self.assertEqual(called, 0)
        self.assertEqual(y, x + 2.0)

        x.requires_grad_(False)
        y = add(x, 2.0)
        self.assertEqual(called, 0)
        self.assertEqual(y, x + 2.0)

        x = torch.randn(3, requires_grad=True)
        y = add(x, 2.0)
        self.assertEqual(called, 1)
        self.assertEqual(y, x + 2.0)

    @skipIfTorchDynamo("Expected to fail due to no FakeTensor support; not a bug")
    def test_manual_schema(self):
        @torch.library.custom_op(
            "_torch_testing::add",
            mutates_args=(),
            schema="(Tensor x, float y) -> Tensor",
        )
        def add(x, y):
            x_np = x.numpy(force=True)
            out_np = x_np + y
            return torch.from_numpy(out_np).to(x.device)

        x = torch.randn(3)
        y = 3.14
        z = add(x, y)
        self.assertEqual(z, x + y)

        @torch.library.custom_op(
            "_torch_testing::sin_",
            mutates_args=["x"],
            schema="(Tensor(a!) x) -> ()",
        )
        def sin_(x):
            x_np = x.numpy()
            np.sin(x_np, out=x_np)

        x = torch.randn(3)
        expected = x.sin()
        sin_(x)
        self.assertEqual(x, expected)

    @skipIfTorchDynamo("Expected to fail due to no FakeTensor support; not a bug")
    def test_subclass_accessor_view_error(self):
        @torch.library.custom_op(
            "_torch_testing::_failing_two_tensor_accessor",
            mutates_args=(),
            schema="(Tensor(a) tx, SymInt idx) -> Tensor(a)",
        )
        def _failing_two_tensor_accessor(tx, idx):
            return tx.view_as(tx)

        def noop(*args):
            pass

        _failing_two_tensor_accessor.register_autograd(noop, setup_context=noop)

        t = torch.rand(2)
        with self.assertRaisesRegex(
            RuntimeError, "Custom ops that are views do not support SymInt."
        ):
            torch.ops._torch_testing._failing_two_tensor_accessor(t, 2)

        @torch.library.custom_op(
            "_torch_testing::_failing_two_tensor_accessor_list",
            mutates_args=(),
            schema="(Tensor(a) tx, SymInt[] idx) -> Tensor(a)",
        )
        def _failing_two_tensor_accessor_list(tx, idx):
            return tx.view_as(tx)

        def noop(*args):
            pass

        _failing_two_tensor_accessor_list.register_autograd(noop, setup_context=noop)

        t = torch.rand(2)
        with self.assertRaisesRegex(
            RuntimeError, "Custom ops that are views do not support SymInt."
        ):
            torch.ops._torch_testing._failing_two_tensor_accessor_list(t, (2,))

    @skipIfTorchDynamo("Expected to fail due to no FakeTensor support; not a bug")
    def test_subclass_accessor_view(self):
        class MyTwoTensor(TwoTensor):
            @classmethod
            def __torch_dispatch__(cls, func, types, args, kwargs):
                if func is torch.ops._torch_testing._two_tensor_accessor.default:
                    self.assertIsInstance(args[0], MyTwoTensor)
                    self.assertIn(args[1], (0, 1))
                    if args[1] == 0:
                        res = args[0].a
                    else:
                        res = args[0].b
                    # Always return a fresh Tensor!
                    return res.view_as(res)
                return super().__torch_dispatch__(func, types, args, kwargs)

        @torch.library.custom_op(
            "_torch_testing::_two_tensor_accessor",
            mutates_args=(),
            schema="(Tensor(a) tx, int idx) -> Tensor(a)",
        )
        def _two_tensor_accessor(tx, idx):
            raise RuntimeError("Should never be called")

        def backward(ctx, gO):
            gI = gO.clone()
            if ctx.idx == 0:
                return MyTwoTensor(gI, torch.zeros_like(gO)), None
            else:
                return MyTwoTensor(torch.zeros_like(gO), gI), None

        def setup_ctx(ctx, inputs, output):
            ctx._is_pure_view = True
            ctx.idx = inputs[1]

        _two_tensor_accessor.register_autograd(backward, setup_context=setup_ctx)

        x = torch.rand(3)
        y = torch.rand(3)
        z = MyTwoTensor(x, y, requires_grad=True)
        res = torch.ops._torch_testing._two_tensor_accessor(z, 0)
        res.sum().backward()
        self.assertEqual(res, x)
        self.assertTrue(res._is_view())
        self.assertTrue(res._base is z)
        self.assertEqual(z.grad, torch.ones_like(z.grad))

        res = torch.ops._torch_testing._two_tensor_accessor(z, 1)
        res.sum().backward()
        self.assertEqual(res, y)
        self.assertTrue(res._is_view())
        self.assertTrue(res._base is z)
        self.assertEqual(z.grad, TwoTensor(torch.ones(3), torch.ones(3)))

        leaf = MyTwoTensor(torch.rand(3), torch.rand(3), requires_grad=True)
        non_leaf = leaf.clone()
        view_a = torch.ops._torch_testing._two_tensor_accessor(non_leaf, 0)
        self.assertTrue(view_a._is_view())
        self.assertTrue(view_a._base is non_leaf)
        view_a *= 2
        self.assertEqual(non_leaf.a, view_a)
        self.assertNotEqual(leaf.a, view_a)
        non_leaf.sum().backward()
        self.assertEqual(leaf.grad, MyTwoTensor(2 * torch.ones(3), torch.ones(3)))

    @skipIfTorchDynamo("Expected to fail due to no FakeTensor support; not a bug")
    def test_kwarg_only_tensors(self):
        with self.assertRaisesRegex(NotImplementedError, "kwarg-only Tensor args"):

            @torch.library.custom_op("_torch_testing::foo", mutates_args=())
            def foo(x: Tensor, *, y: int, z: Tensor) -> Tensor:
                pass

        with self.assertRaisesRegex(NotImplementedError, "kwarg-only Tensor args"):

            @torch.library.custom_op("_torch_testing::foo", mutates_args=())
            def foo2(x: Tensor, *, y: int, z: Optional[Tensor]) -> Tensor:
                pass

        with self.assertRaisesRegex(NotImplementedError, "kwarg-only Tensor args"):

            @torch.library.custom_op("_torch_testing::foo", mutates_args=())
            def foo3(x: Tensor, *, y: int, z: List[Tensor]) -> Tensor:
                pass

        with torch.library._scoped_library("_torch_testing", "FRAGMENT") as lib:
            lib.define("foo(Tensor x, *, Tensor y) -> Tensor")
            with self.assertRaisesRegex(NotImplementedError, "kwarg-only Tensor args"):
                torch.library.register_autograd(
                    "_torch_testing::foo",
                    lambda grad: grad,
                    setup_context=lambda ctx, inputs, keyword_only_inputs, output: None,
                )

            with self.assertRaisesRegex(NotImplementedError, "kwarg-only Tensor args"):
                torch.library.register_vmap(
                    "_torch_testing::foo",
                    lambda info, in_dims, x, *, y: (x, 0),
                )

    @skipIfTorchDynamo("Expected to fail due to no FakeTensor support; not a bug")
    def test_register_autograd_kwargonly_low_level(self):
        with torch.library._scoped_library("_torch_testing", "FRAGMENT") as lib:
            lib.define("foo(Tensor x, *, float y) -> Tensor")
            called = False

            def foo_impl(x, *, y):
                return x * y

            lib.impl("foo", foo_impl, "CPU")

            def backward(ctx, grad):
                nonlocal called
                called = True
                return grad * ctx.y

            def setup_context(ctx, inputs, keyword_only_inputs, output):
                assert tuple(keyword_only_inputs.keys()) == ("y",)
                ctx.y = keyword_only_inputs["y"]

            torch.library.register_autograd(
                "_torch_testing::foo", backward, setup_context=setup_context, lib=lib
            )

            x = torch.randn(3, requires_grad=True)
            torch.ops._torch_testing.foo(x, y=3.14).sum().backward()
            self.assertTrue(called)
            self.assertEqual(x.grad, torch.tensor([3.14, 3.14, 3.14]))

    @skipIfTorchDynamo("Expected to fail due to no FakeTensor support; not a bug")
    def test_register_autograd_defaults(self):
        with torch.library._scoped_library("_torch_testing", "FRAGMENT") as lib:
            lib.define("foo(Tensor w, int x = 2, *, int y = 3, int z) -> Tensor")

            def foo_impl(w, x=2, *, y=3, z):
                return w * x * y * z

            lib.impl("foo", foo_impl, "CPU")

            called = False

            def backward(ctx, grad):
                nonlocal called
                called = True
                return grad * ctx.c

            def setup_context(ctx, inputs, keyword_only_inputs, output):
                assert len(inputs) == 2
                assert inputs[1] == 2
                assert keyword_only_inputs == {"y": 3, "z": 42}
                ctx.c = keyword_only_inputs["y"] * keyword_only_inputs["z"] * inputs[1]

            torch.library.register_autograd(
                "_torch_testing::foo", backward, setup_context=setup_context, lib=lib
            )

            w = torch.randn(3, requires_grad=True)
            torch.ops._torch_testing.foo(w, z=42).sum().backward()
            self.assertTrue(called)
            self.assertEqual(w.grad, torch.full_like(w, 2 * 3 * 42))

    @skipIfTorchDynamo("Expected to fail due to no FakeTensor support; not a bug")
    def test_manual_schema_error(self):
        with self.assertRaisesRegex(ValueError, "the op mutates {'x'}"):

            @torch.library.custom_op(
                "_torch_testing::sin_",
                mutates_args=(),
                schema="(Tensor(a!) x) -> ()",
            )
            def sin_(x):
                x_np = x.numpy()
                np.sin(x_np, out=x_np)

    def test_supports_tensorlist(self):
        @torch._library.autograd.supports_tensorlist
        class Stack(torch.autograd.Function):
            @staticmethod
            def forward(ctx, xs):
                ctx.num_xs = len(xs)
                return torch.stack(xs)

            @staticmethod
            def backward(ctx, grad):
                expected = ([True] * ctx.num_xs,)
                self.assertEqual(ctx.needs_input_grad, expected)
                return list(grad.unbind(0))

        # call two applies, do a backward on the first
        def t():
            return torch.randn([], requires_grad=True)

        xs0 = [t(), t(), t()]
        xs1 = [t(), t(), t(), t()]
        y0 = Stack.apply(xs0)
        y1 = Stack.apply(xs1)
        grads = torch.autograd.grad(y0.sum(), xs0)
        self.assertEqual(grads, [torch.tensor(1.0) for _ in range(3)])

        # call one apply, do multiple backwards
        xs = [t(), t(), t()]
        y = Stack.apply(xs)
        _ = torch.autograd.grad(y.sum(), xs, retain_graph=True)
        _ = torch.autograd.grad(y.sum(), xs, retain_graph=True)
        grads = torch.autograd.grad(y.sum(), xs, retain_graph=True)
        self.assertEqual(grads, [torch.tensor(1.0) for _ in range(3)])

        # error: on access forward, backward directly
        with self.assertRaisesRegex(NotImplementedError, "Function.forward directly"):
            Stack.forward(None, xs)
        with self.assertRaisesRegex(NotImplementedError, "Function.backward directly"):
            Stack.backward(None, xs)

        # the recursive case
        @torch._library.autograd.supports_tensorlist
        class Foo(torch.autograd.Function):
            @staticmethod
            def forward(ctx, xs):
                if len(xs) > 1:
                    return Foo.apply(xs[1:])
                ctx.len_xs = len(xs)
                return xs[0].sin()

            @staticmethod
            def backward(ctx, grad):
                result = [None] * ctx.len_xs
                result[-1] = grad.cos()
                return result

        # should work
        result = Foo.apply(xs)
        expected = xs[-1].sin()
        self.assertEqual(result, expected)

        # recursive on backward
        @torch._library.autograd.supports_tensorlist
        class Bar(torch.autograd.Function):
            @staticmethod
            def forward(ctx, xs):
                return [xs[i] + i for i in range(len(xs))]

            @staticmethod
            def backward(ctx, grads):
                f1 = Bar.apply(grads[:2])
                f2 = Bar.apply(grads[2:])
                return f1 + f2

        xs = [torch.tensor(0.0, requires_grad=True) for _ in range(5)]
        ys = Bar.apply(xs)
        sum(ys).backward()
        result = [xi.grad for xi in xs]
        self.assertEqual(result, torch.tensor([1.0, 2, 1, 2, 3]).unbind(0))

    @skipIfTorchDynamo("Expected to fail due to no FakeTensor support; not a bug")
    def test_default_values(self):
        defaults = []

        @torch.library.custom_op("_torch_testing::f", mutates_args=())
        def f(
            x: Tensor,
            a: Optional[int] = None,
            b: float = 3.14,
            c: bool = True,
            d: int = 3,
            e: str = "foo",
            f: torch.dtype = torch.float,
            g: torch.dtype = torch.float32,
            h: torch.dtype = torch.int,
            i: torch.device = torch.device("cpu:0"),
            j: torch.device = "cpu",
        ) -> Tensor:
            defaults.extend([a, b, c, d, e, f, g, h, i, j])
            return x.clone()

        x = torch.randn(3)
        f(x)
        self.assertEqual(
            defaults,
            [
                None,
                3.14,
                True,
                3,
                "foo",
                torch.float,
                torch.float32,
                torch.int,
                torch.device("cpu:0"),
                "cpu",
            ],
        )
        default_values = [
            arg.default_value
            for arg in torch.ops._torch_testing.f.default._schema.arguments
        ]
        # enum values taken from c10/core/ScalarType.h
        type_enum = {
            "float": 6,
            "int": 3,
        }
        self.assertEqual(
            default_values,
            [
                None,
                None,
                3.14,
                True,
                3,
                "foo",
                type_enum["float"],
                type_enum["float"],
                type_enum["int"],
                torch.device("cpu:0"),
                torch.device("cpu"),
            ],
        )

    def test_mutated_error(self):
        with self.assertRaisesRegex(
            ValueError, r".*{'y'} in mutates_args were not found"
        ):

            @torch.library.custom_op(
                "_torch_testing::numpy_sin_inplace",
                mutates_args={"y"},
                device_types="cpu",
            )
            def numpy_sin_inplace(x: Tensor) -> None:
                x_np = x.numpy()
                np.sin(x_np, out=x_np)

    def test_mutated(self):
        @torch.library.custom_op(
            "_torch_testing::numpy_sin_inplace", mutates_args={"x"}, device_types="cpu"
        )
        def numpy_sin_inplace(x: Tensor) -> None:
            x_np = x.numpy()
            np.sin(x_np, out=x_np)

        x = torch.randn(3)
        version = x._version
        expected = x.sin()
        numpy_sin_inplace(x)
        self.assertEqual(x, expected)
        self.assertGreater(x._version, version)

        @torch.library.custom_op("_torch_testing::f", mutates_args={"y", "z", "w"})
        def f(
            x: Tensor, y: Optional[Tensor], z: List[Tensor], w: List[Optional[Tensor]]
        ) -> None:
            return

        x = torch.randn(3)
        y = torch.randn(3)
        z = [torch.randn(3), torch.randn(3)]
        w = [torch.randn(3), None, torch.randn(3)]
        initial_versions = pytree.tree_map_only(
            torch.Tensor, lambda x: x._version, (x, y, z, w)
        )
        f(x, y, z, w)
        new_versions = pytree.tree_map_only(
            torch.Tensor, lambda x: x._version, (x, y, z, w)
        )

        self.assertEqual(initial_versions[0], new_versions[0])
        initial_versions, _ = pytree.tree_flatten(initial_versions[1:])
        new_versions, _ = pytree.tree_flatten(new_versions[1:])
        for prev, after in zip(initial_versions, new_versions):
            if prev is None and after is None:
                continue
            self.assertGreater(after, prev)

    def test_mutated_unknown(self):
        @torch.library.custom_op(
            "_torch_testing::f", mutates_args="unknown", device_types="cpu"
        )
        def f(x: Tensor) -> None:
            x_np = x.numpy()
            np.sin(x_np, out=x_np)

        x = torch.randn(3)
        version = x._version
        expected = x.sin()
        f(x)
        self.assertEqual(x, expected)
        self.assertGreater(x._version, version)

        @torch.library.custom_op("_torch_testing::f2", mutates_args="unknown")
        def f2(
            x: Tensor, y: Optional[Tensor], z: List[Tensor], w: List[Optional[Tensor]]
        ) -> None:
            return

        x = torch.randn(3)
        y = torch.randn(3)
        z = [torch.randn(3), torch.randn(3)]
        w = [torch.randn(3), None, torch.randn(3)]
        initial_versions = pytree.tree_map_only(
            torch.Tensor, lambda x: x._version, (x, y, z, w)
        )
        f2(x, y, z, w)
        new_versions = pytree.tree_map_only(
            torch.Tensor, lambda x: x._version, (x, y, z, w)
        )

        initial_versions, _ = pytree.tree_flatten(initial_versions)
        new_versions, _ = pytree.tree_flatten(new_versions)
        for prev, after in zip(initial_versions, new_versions):
            if prev is None and after is None:
                continue
            self.assertGreater(after, prev)

        with self.assertRaisesRegex(ValueError, "string"):

            @torch.library.custom_op("_torch_testing::f3", mutates_args="x")
            def f3(x: Tensor) -> None:
                return

    @skipIfTorchDynamo("Expected to fail due to no FakeTensor support; not a bug")
    def test_library_register_torch_dispatch_rule_subclass(self):
        from torch.testing._internal.two_tensor import TwoTensor

        @torch.library.custom_op("mylib::foo", mutates_args={})
        def f(x: torch.Tensor) -> torch.Tensor:
            return x.sin()

        x = torch.randn(3)
        y = torch.randn(3)
        z = TwoTensor(x, y)

        with torch.library._scoped_library("mylib", "FRAGMENT") as m:
            called = 0

            def TwoTensor_foo(cls, func, types, args, kwargs):
                nonlocal called
                assert cls is TwoTensor
                called += 1
                return x.sin()

            m._register_torch_dispatch_rule("foo", TwoTensor, TwoTensor_foo)

            out = f(z)
            out2 = z.cos()

        self.assertEqual(called, 1)

    @skipIfTorchDynamo("Expected to fail due to no FakeTensor support; not a bug")
    def test_library_register_torch_dispatch_rule_mode(self):
        from torch.testing._internal.two_tensor import TwoTensorMode

        @torch.library.custom_op("mylib::foo", mutates_args={})
        def f(x: torch.Tensor) -> torch.Tensor:
            return x.sin()

        x = torch.randn(3)

        with torch.library._scoped_library("mylib", "FRAGMENT") as m:
            called = 0

            def TwoTensor_foo(mode, func, types, args, kwargs):
                nonlocal called
                called += 1
                return x.sin()

            m._register_torch_dispatch_rule("foo", TwoTensorMode, TwoTensor_foo)

            with TwoTensorMode():
                out = f(x)
                out2 = x.cos()

        self.assertEqual(called, 1)

    @skipIfTorchDynamo("Expected to fail due to no FakeTensor support; not a bug")
    @parametrize("idx", [0, 1, 2, 3, 4, 5])
    def test_library_register_fake_source(self, idx):
        opname = f"source{idx}"
        op = getattr(torch.ops._torch_testing, opname).default
        entry = torch._library.simple_registry.singleton.find(op._name)
        source = entry.fake_impl.kernel.source
        assert source is not None
        self.assertTrue("custom_op_db.py" in source)

    @skipIfTorchDynamo("Expected to fail due to no FakeTensor support; not a bug")
    def test_library_register_fake(self):
        for mode in ["function", "qualname", "opoverload"]:

            @torch.library.custom_op("_torch_testing::add", mutates_args=())
            def add(x: Tensor, y: float) -> Tensor:
                x_np = x.cpu().numpy()
                out_np = x_np + y
                return torch.from_numpy(out_np).to(x.device)

            called = False

            if mode == "function":
                dec = torch.library.register_fake(add)
                self.assertIsNotNone(dec)
            elif mode == "qualname":
                dec = torch.library.register_fake("_torch_testing::add")
                self.assertIsNotNone(dec)
            elif mode == "opoverload":
                dec = torch.library.register_fake(torch.ops._torch_testing.add.default)
                self.assertIsNotNone(dec)
            else:
                raise AssertionError("should not get here")

            @dec
            def _(x, y):
                nonlocal called
                called = True
                return torch.empty_like(x)

            with torch._subclasses.fake_tensor.FakeTensorMode():
                x = torch.randn(3)
                y = 3.14
                z = add(x, y)
                self.assertEqual(z.shape, x.shape)
                self.assertTrue(called)

    @skipIfTorchDynamo("Expected to fail due to no FakeTensor support; not a bug")
    def test_library_register_torch_dispatch(self):
        for mode in ["function", "qualname", "opoverload"]:

            class MyMode(torch.utils._python_dispatch.TorchDispatchMode):
                def __torch_dispatch__(self, func, types, args=(), kwargs=None):
                    return func(*args, **kwargs)

            @torch.library.custom_op("_torch_testing::add", mutates_args=())
            def add(x: Tensor, y: float) -> Tensor:
                x_np = x.cpu().numpy()
                out_np = x_np + y
                return torch.from_numpy(out_np).to(x.device)

            called = False

            if mode == "function":
                dec = torch.library.register_torch_dispatch(add, MyMode)
                self.assertIsNotNone(dec)
            elif mode == "qualname":
                dec = torch.library.register_torch_dispatch(
                    "_torch_testing::add", MyMode
                )
                self.assertIsNotNone(dec)
            elif mode == "opoverload":
                dec = torch.library.register_torch_dispatch(
                    torch.ops._torch_testing.add.default, MyMode
                )
                self.assertIsNotNone(dec)
            else:
                raise AssertionError("should not get here")

            @dec
            def _(mode, func, types, args, kwargs):
                nonlocal called
                called = True
                return func(*args, **kwargs)

            with MyMode():
                x = torch.randn(3)
                y = 3.14
                z = add(x, y)
                self.assertEqual(z.shape, x.shape)
                self.assertTrue(called)

    @skipIfTorchDynamo("Expected to fail due to no FakeTensor support; not a bug")
    def test_library_register_torch_dispatch_low_level(self):
        modes = ["qualname", "opoverload"]
        calls = ["decorator", "function"]
        device_types_options = [("cpu", "cuda"), "cpu", None]

        for mode, call, device_types in itertools.product(
            modes, calls, device_types_options
        ):
            with torch.library._scoped_library("_torch_testing", "FRAGMENT") as lib:
                lib.define("add10(Tensor x, float y) -> Tensor")

                if mode == "qualname":
                    op = "_torch_testing::add10"
                else:
                    assert mode == "opoverload"
                    op = torch.ops._torch_testing.add10.default

                called = False

                class MyMode(torch.utils._python_dispatch.TorchDispatchMode):
                    def __torch_dispatch__(self, func, types, args=(), kwargs=None):
                        return func(*args, **kwargs)

                if call == "decorator":

                    @torch.library.register_torch_dispatch(op, MyMode, lib=lib)
                    def _(mode, func, types, args, kwargs):
                        x, y = args
                        nonlocal called
                        called = True
                        return x + y

                else:
                    assert call == "function"

                    def add_stuff(mode, func, types, args, kwargs):
                        x, y = args
                        nonlocal called
                        called = True
                        return x + y

                    torch.library.register_torch_dispatch(
                        op, MyMode, add_stuff, lib=lib
                    )

                x = torch.randn(3)
                y = 3.14
                with MyMode():
                    z = torch.ops._torch_testing.add10.default(x, y)
                self.assertEqual(z, x + y)
                self.assertTrue(called)

    @skipIfTorchDynamo("Expected to fail due to no FakeTensor support; not a bug")
    def test_library_register_kernel(self):
        modes = ["function", "qualname", "opoverload"]
        calls = ["decorator", "function"]
        device_types_options = ["cpu", None]

        for mode, call, device_types in itertools.product(
            modes, calls, device_types_options
        ):

            @torch.library.custom_op(
                "_torch_testing::add", mutates_args=(), device_types="cuda"
            )
            def add(x: Tensor, y: float) -> Tensor:
                x_np = x.cpu().numpy()
                out_np = x_np + y
                return torch.from_numpy(out_np).to(x.device)

            if mode == "function":
                op = add
            elif mode == "qualname":
                op = "_torch_testing::add"
            else:
                assert mode == "opoverload"
                op = torch.ops._torch_testing.add.default

            called = False

            if call == "decorator":

                @torch.library.register_kernel(op, device_types)
                def _(x, y):
                    nonlocal called
                    called = True
                    x_np = x.numpy()
                    out_np = x_np + y
                    return torch.from_numpy(out_np)

            else:
                assert call == "function"

                def add_cpu(x, y):
                    nonlocal called
                    called = True
                    x_np = x.numpy()
                    out_np = x_np + y
                    return torch.from_numpy(out_np)

                torch.library.register_kernel(op, device_types, add_cpu)

            x = torch.randn(3)
            y = 3.14
            z = add(x, y)
            self.assertEqual(z, x + y)
            self.assertTrue(called)

    @skipIfTorchDynamo("Expected to fail due to no FakeTensor support; not a bug")
    def test_library_register_kernel_low_level(self):
        modes = ["qualname", "opoverload"]
        calls = ["decorator", "function"]
        device_types_options = [("cpu", "cuda"), "cpu", None]

        for mode, call, device_types in itertools.product(
            modes, calls, device_types_options
        ):
            with torch.library._scoped_library("_torch_testing", "FRAGMENT") as lib:
                lib.define("add9(Tensor x, float y) -> Tensor")

                if mode == "qualname":
                    op = "_torch_testing::add9"
                else:
                    assert mode == "opoverload"
                    op = torch.ops._torch_testing.add9.default

                called = False

                if call == "decorator":

                    @torch.library.register_kernel(op, device_types, lib=lib)
                    def _(x, y):
                        nonlocal called
                        called = True
                        x_np = x.numpy()
                        out_np = x_np + y
                        return torch.from_numpy(out_np)

                else:
                    assert call == "function"

                    def add_cpu(x, y):
                        nonlocal called
                        called = True
                        x_np = x.numpy()
                        out_np = x_np + y
                        return torch.from_numpy(out_np)

                    torch.library.register_kernel(op, device_types, add_cpu, lib=lib)

                x = torch.randn(3)
                y = 3.14
                z = torch.ops._torch_testing.add9.default(x, y)
                self.assertEqual(z, x + y)
                self.assertTrue(called)

    @skipIfTorchDynamo("Expected to fail due to no FakeTensor support; not a bug")
    @unittest.skipIf(not TEST_CUDA, "requires CUDA")
    def test_library_register_autocast(self):
        for device in ["cuda", "cpu"]:
            for mode in ["function", "qualname", "opoverload"]:

                @torch.library.custom_op("mylib::my_sin", mutates_args=())
                def my_sin(x: Tensor) -> Tensor:
                    return torch.sin(x)

                if mode == "function":
                    torch.library.register_autocast(my_sin, device, torch.float16)
                elif mode == "qualname":
                    torch.library.register_autocast(
                        "mylib::my_sin", device, torch.float16
                    )
                elif mode == "opoverload":
                    torch.library.register_autocast(
                        torch.ops.mylib.my_sin.default, device, torch.float16
                    )

                x = torch.randn(3, dtype=torch.float32, device=device)
                with torch.autocast(device, dtype=torch.float16):
                    y = torch.ops.mylib.my_sin(x)
                self.assertEqual(y.dtype, torch.float16)

    @skipIfTorchDynamo("Expected to fail due to no FakeTensor support; not a bug")
    @unittest.skipIf(not TEST_CUDA, "requires CUDA")
    def test_library_register_autocast_low_level(self):
        for device in ["cuda", "cpu"]:
            for mode in ["qualname", "opoverload"]:
                with torch.library._scoped_library("_torch_testing", "FRAGMENT") as lib:
                    lib.define("my_sin(Tensor x) -> Tensor")

                    def my_sin(x: Tensor) -> Tensor:
                        return torch.sin(x)

                    lib.impl("my_sin", my_sin, device.upper())

                    if mode == "qualname":
                        torch.library.register_autocast(
                            "_torch_testing::my_sin", device, torch.float16, lib=lib
                        )
                    elif mode == "opoverload":
                        torch.library.register_autocast(
                            torch.ops._torch_testing.my_sin.default,
                            device,
                            torch.float16,
                            lib=lib,
                        )

                    x = torch.randn(3, dtype=torch.float32, device=device)
                    with torch.autocast(device, dtype=torch.float16):
                        y = torch.ops._torch_testing.my_sin(x)
                    self.assertEqual(y.dtype, torch.float16)

    @skipIfTorchDynamo("Expected to fail due to no FakeTensor support; not a bug")
    @unittest.skipIf(not TEST_CUDA, "requires CUDA")
    def test_library_register_autocast_list_input(self):
        for device in ["cuda", "cpu"]:
            for mode in ["function", "qualname", "opoverload"]:

                @torch.library.custom_op("mylib::my_add_sin", mutates_args=())
                def my_add_sin(x: List[Tensor]) -> Tensor:
                    return torch.sin(x[0] + x[1])

                if mode == "function":
                    torch.library.register_autocast(my_add_sin, device, torch.float16)
                elif mode == "qualname":
                    torch.library.register_autocast(
                        "mylib::my_add_sin", device, torch.float16
                    )
                elif mode == "opoverload":
                    torch.library.register_autocast(
                        torch.ops.mylib.my_add_sin.default, device, torch.float16
                    )

                lst = [
                    torch.randn(3, dtype=torch.float32, device=device) for _ in range(2)
                ]
                with torch.autocast(device, dtype=torch.float16):
                    y = torch.ops.mylib.my_add_sin(lst)
                self.assertEqual(y.dtype, torch.float16)

    @skipIfTorchDynamo("Expected to fail due to no FakeTensor support; not a bug")
    @unittest.skipIf(not TEST_CUDA, "requires CUDA")
    def test_library_register_autocast_multiple_times(self):
        for device in ["cuda", "cpu"]:

            @torch.library.custom_op("mylib::my_sin", mutates_args=())
            def my_sin(x: Tensor) -> Tensor:
                return torch.sin(x)

            torch.library.register_autocast(my_sin, device, torch.float16)

            x = torch.randn(3, dtype=torch.float32, device=device)
            with torch.autocast(device, dtype=torch.float16):
                y1 = my_sin(x)
            self.assertEqual(y1.dtype, torch.float16)

            # Ensure calling register_autocast multiple times does not error out.
            torch.library.register_autocast(my_sin, device, torch.float16)

            with torch.autocast(device, dtype=torch.float16):
                y2 = my_sin(x)
            self.assertEqual(y2.dtype, torch.float16)

    @skipIfTorchDynamo("Expected to fail due to no FakeTensor support; not a bug")
    @unittest.skipIf(not TEST_CUDA, "requires CUDA")
    def test_library_register_autocast_multiple_times_different_devices(self):
        @torch.library.custom_op("mylib::my_sin", mutates_args=())
        def my_sin(x: Tensor) -> Tensor:
            return torch.sin(x)

        # Register autocast for CUDA
        torch.library.register_autocast(my_sin, "cuda", torch.float16)

        x1 = torch.randn(3, dtype=torch.float32, device="cuda")
        with torch.autocast("cuda", dtype=torch.float16):
            y1 = my_sin(x1)
        self.assertEqual(y1.dtype, torch.float16)

        # Register autocast for CPU
        torch.library.register_autocast(my_sin, "cpu", torch.float16)

        x2 = torch.randn(3, dtype=torch.float32, device="cpu")
        with torch.autocast("cpu", dtype=torch.float16):
            y2 = my_sin(x2)
        self.assertEqual(y2.dtype, torch.float16)

        # Register CUDA autocast for the second time
        torch.library.register_autocast(my_sin, "cuda", torch.float16)

        with torch.autocast("cuda", dtype=torch.float16):
            y3 = my_sin(x1)
        self.assertEqual(y3.dtype, torch.float16)

        # Register CPU autocast for the second time
        torch.library.register_autocast(my_sin, "cpu", torch.float16)

        with torch.autocast("cpu", dtype=torch.float16):
            y4 = my_sin(x2)
        self.assertEqual(y4.dtype, torch.float16)

    @skipIfTorchDynamo("Expected to fail due to no FakeTensor support; not a bug")
    def test_library_register_autograd(self):
        for mode in ["function", "qualname", "opoverload"]:

            @torch.library.custom_op("mylib::numpy_sin", mutates_args=())
            def numpy_sin(x: Tensor) -> Tensor:
                x_np = x.cpu().numpy()
                y_np = np.sin(x_np)
                return torch.from_numpy(y_np).to(device=x.device)

            def setup_context(ctx, inputs, output) -> Tensor:
                (x,) = inputs
                ctx.save_for_backward(x)

            called = False

            def backward(ctx, grad):
                nonlocal called
                called = True
                (x,) = ctx.saved_tensors
                return grad * x.cos()

            if mode == "function":
                torch.library.register_autograd(
                    numpy_sin, backward, setup_context=setup_context
                )
            elif mode == "qualname":
                torch.library.register_autograd(
                    "mylib::numpy_sin", backward, setup_context=setup_context
                )
            elif mode == "opoverload":
                torch.library.register_autograd(
                    torch.ops.mylib.numpy_sin.default,
                    backward,
                    setup_context=setup_context,
                )

            x = torch.randn(3, requires_grad=True)
            y = numpy_sin(x)
            (grad_x,) = torch.autograd.grad(y, x, torch.ones_like(y))
            self.assertTrue(called)
            self.assertEqual(grad_x, x.cos())

    @skipIfTorchDynamo("Expected to fail due to no FakeTensor support; not a bug")
    def test_library_register_autograd_low_level(self):
        for mode in ["qualname", "opoverload"]:
            with torch.library._scoped_library("_torch_testing", "FRAGMENT") as lib:
                lib.define("sin5(Tensor x) -> Tensor")

                def numpy_sin(x: Tensor) -> Tensor:
                    x_np = x.cpu().detach().numpy()
                    y_np = np.sin(x_np)
                    return torch.from_numpy(y_np).to(device=x.device)

                def setup_context(ctx, inputs, output) -> Tensor:
                    (x,) = inputs
                    ctx.save_for_backward(x)

                called = False

                def backward(ctx, grad):
                    nonlocal called
                    called = True
                    (x,) = ctx.saved_tensors
                    return grad * x.cos()

                lib.impl("sin5", numpy_sin, "CPU")

                called = False

                if mode == "qualname":
                    torch.library.register_autograd(
                        "_torch_testing::sin5",
                        backward,
                        setup_context=setup_context,
                        lib=lib,
                    )
                elif mode == "opoverload":
                    torch.library.register_autograd(
                        torch.ops._torch_testing.sin5.default,
                        backward,
                        setup_context=setup_context,
                        lib=lib,
                    )
                x = torch.randn(3, requires_grad=True)
                y = torch.ops._torch_testing.sin5(x)
                (grad_x,) = torch.autograd.grad(y, x, torch.ones_like(y))
                self.assertTrue(called)
                self.assertEqual(grad_x, x.cos())

    @skipIfTorchDynamo("Expected to fail due to no FakeTensor support; not a bug")
    def test_fake(self):
        @torch.library.custom_op("_torch_testing::add", mutates_args=())
        def add(x: Tensor, y: float) -> Tensor:
            x_np = x.cpu().numpy()
            out_np = x_np + y
            return torch.from_numpy(out_np).to(x.device)

        x = torch.randn(3)
        y = 3.14
        z = add(x, y)
        self.assertEqual(z, x + y)

        try:
            with torch._subclasses.fake_tensor.FakeTensorMode():
                x = torch.randn(3)
                add(x, y)
            raise AssertionError("should not be hit")
        except RuntimeError as e:
            abstract_impl_error_msg = str(e)
        abstract_impl_error_msg = re.sub(
            r"0x.*>\)>", "0xDEADBEEF>)>", abstract_impl_error_msg
        ).replace(". ", ".\n")
        self.assertExpectedInline(
            abstract_impl_error_msg,
            """\
There was no fake impl registered for <CustomOpDef(_torch_testing::add)>.
This is necessary for torch.compile/export/fx tracing to work.
Please use `add.register_fake` to add an fake impl.""",
        )

        if not IS_WINDOWS:

            @torch.compile(backend="eager")
            def f(x, y):
                return add(x, y)

            x = torch.randn(3)
            with self.assertRaisesRegex(RuntimeError, "no fake impl"):
                f(x, y)

        abstract_called = False

        @add.register_fake
        def _(x, y):
            nonlocal abstract_called
            abstract_called = True
            return torch.empty_like(x)

        with torch._subclasses.fake_tensor.FakeTensorMode():
            x = torch.randn(3)
            z = add(x, y)
            self.assertEqual(z.shape, x.shape)
            self.assertTrue(abstract_called)

    @skipIfTorchDynamo("recursive dynamo")
    @unittest.skipIf(IS_WINDOWS, "torch.compile doesn't work on windows")
    def test_compile(self):
        called_impl = False
        called_abstract = False

        @torch.library.custom_op("_torch_testing::linear", mutates_args=())
        def custom_linear(x: Tensor, weight: Tensor, bias: Tensor) -> Tensor:
            nonlocal called_impl
            called_impl = True
            x_np = x.numpy()
            w_np = weight.numpy()
            b_np = bias.numpy()
            out_np = np.add(x_np @ w_np.T, bias)
            return out_np

        @custom_linear.register_fake
        def _(x, weight, bias):
            nonlocal called_abstract
            called_abstract = True
            assert x.dim() == 2
            assert weight.dim() == 2
            assert bias.dim() == 1
            assert x.shape[1] == weight.shape[1]
            assert weight.shape[0] == bias.shape[0]
            assert x.device == weight.device
            return x.new_empty(x.size(0), weight.size(0))

        x = torch.randn(2, 2)
        weight = torch.randn(2, 2)
        bias = torch.randn(2)
        out = torch.compile(custom_linear, backend="eager", fullgraph=True)(
            x, weight, bias
        )
        self.assertEqual(out, torch.nn.functional.linear(x, weight, bias))
        self.assertTrue(called_impl)
        self.assertTrue(called_abstract)

    @skipIfTorchDynamo("Expected to fail due to no FakeTensor support; not a bug")
    def test_register_autograd_error_cases(self):
        @torch.library.custom_op("_torch_testing::g", mutates_args=())
        def g(x: Tensor) -> Tensor:
            return x.sin()

        x = torch.randn(3, requires_grad=True)
        y = g(x)
        with self.assertRaisesRegex(RuntimeError, "no autograd formula"):
            y.sum().backward()

    @skipIfTorchDynamo("Expected to fail due to no FakeTensor support; not a bug")
    def test_replacement(self):
        @torch.library.custom_op("_torch_testing::f", mutates_args=())
        def f(x: Tensor) -> Tensor:
            return x.sin()

        x = torch.randn(3)
        y = f(x)
        self.assertEqual(y, x.sin())

        @torch.library.custom_op("_torch_testing::f", mutates_args=())
        def f(x: Tensor) -> Tensor:
            return x.cos()

        y = f(x)
        self.assertEqual(y, x.cos())

    @skipIfTorchDynamo("Expected to fail due to no FakeTensor support; not a bug")
    @unittest.skipIf(not TEST_CUDA, "requires CUDA")
    def test_split_device(self):
        cpu_call_count = 0
        cuda_call_count = 0

        @torch.library.custom_op(
            "_torch_testing::f", mutates_args=(), device_types="cpu"
        )
        def f(x: Tensor) -> Tensor:
            nonlocal cpu_call_count
            cpu_call_count += 1
            x_np = x.numpy()
            out_np = np.sin(x_np)
            return torch.from_numpy(out_np)

        @f.register_kernel("cuda")
        def _(x: Tensor) -> Tensor:
            nonlocal cuda_call_count
            cuda_call_count += 1
            x_np = x.cpu().numpy()
            out_np = np.sin(x_np)
            return torch.from_numpy(out_np).to(x.device)

        x = torch.randn(3)
        y = f(x)
        self.assertEqual(y, x.sin())
        self.assertEqual(cpu_call_count, 1)
        self.assertEqual(cuda_call_count, 0)

        x = x.cuda()
        y = f(x)
        self.assertEqual(y, x.sin())
        self.assertEqual(cpu_call_count, 1)
        self.assertEqual(cuda_call_count, 1)

    @skipIfTorchDynamo("Expected to fail due to no FakeTensor support; not a bug")
    @unittest.skipIf(not TEST_CUDA, "requires CUDA")
    def test_multi_types(self):
        @torch.library.custom_op(
            "_torch_testing::f", mutates_args=(), device_types=("cpu", "cuda")
        )
        def f(x: Tensor) -> Tensor:
            x_np = x.cpu().numpy()
            out_np = np.sin(x_np)
            return torch.from_numpy(out_np).to(x.device)

        x = torch.randn(3)
        y = f(x)
        self.assertEqual(y, x.sin())
        x = x.cuda()
        y = f(x)
        self.assertEqual(y, x.sin())

    @skipIfTorchDynamo("Expected to fail due to no FakeTensor support; not a bug")
    def test_overloading(self):
        called_f = 0
        called_f1 = 0

        @torch.library.custom_op("_torch_testing::f", mutates_args=())
        def f(x: Tensor) -> Tensor:
            nonlocal called_f
            called_f += 1
            return x.clone()

        x = torch.randn(2, 3)
        torch.ops._torch_testing.f(x)
        self.assertEqual(called_f, 1)

        @torch.library.custom_op("_torch_testing::f.overload", mutates_args=())
        def f1(x: Tensor, y: Tensor) -> Tensor:
            nonlocal called_f1
            called_f1 += 1
            return x.clone()

        torch.ops._torch_testing.f(x, x)
        self.assertEqual(called_f1, 1)

    def test_disallows_output_aliasing(self):
        @torch.library.custom_op("_torch_testing::f", mutates_args=())
        def f(x: Tensor) -> Tensor:
            return x.view(-1)

        x = torch.randn(3)
        with self.assertRaisesRegex(RuntimeError, "may not alias"):
            f(x)

        @torch.library.custom_op("_torch_testing::f", mutates_args=())
        def f(x: Tensor) -> Tensor:
            return x

        x = torch.randn(3)
        with self.assertRaisesRegex(RuntimeError, "may not alias"):
            f(x)

        @torch.library.custom_op(
            "_torch_testing::f", mutates_args={"x"}, device_types="cpu"
        )
        def numpy_sin_inplace(x: Tensor) -> Tensor:
            x_np = x.numpy()
            np.sin(x_np, out=x_np)
            return x

        x = torch.randn(3)
        with self.assertRaisesRegex(RuntimeError, "may not alias"):
            numpy_sin_inplace(x)

    @skipIfTorchDynamo("Expected to fail due to no FakeTensor support; not a bug")
    def test_factory_function(self):
        @torch.library.custom_op(
            "_torch_testing::f", mutates_args={}, device_types="cpu"
        )
        def f(device: torch.device) -> Tensor:
            return torch.ones(3)

        result = f(device="cpu")
        self.assertEqual(result.device, torch.device("cpu"))
        self.assertEqual(result, torch.ones(3))

        with self.assertRaisesRegex(
            RuntimeError, "f does not have a kernel registered for cuda"
        ):
            f("cuda")

        with self.assertRaisesRegex(
            ValueError,
            "Functions without tensor inputs are required to have a `device: torch.device` argument",
        ):

            @torch.library.custom_op(
                "_torch_testing::f2", mutates_args={}, device_types="cpu"
            )
            def f2() -> Tensor:
                return torch.ones(3)

        @torch.library.custom_op("_torch_testing::f3", mutates_args={})
        def f3() -> Tensor:
            raise NotImplementedError("NYI")

        with self.assertRaisesRegex(
            ValueError,
            "Functions without tensor inputs are required to have a `device: torch.device` argument",
        ):

            @f3.register_kernel("cpu")
            def _():
                return torch.zeros(3)

            result = f(x)

        @torch.library.custom_op("_torch_testing::f4", mutates_args={})
        def f4(device: torch.device) -> Tensor:
            raise NotImplementedError("NYI")

        @f4.register_kernel("cpu")
        def _(device: torch.device):
            return torch.zeros(3)

        result = f(device="cpu")
        self.assertEqual(result.device, torch.device("cpu"))
        self.assertEqual(result, torch.ones(3))

    def test_library_schema_infer(self):
        def foo_impl(x: torch.Tensor) -> torch.Tensor:
            return x.sin()

        schema = torch.library.infer_schema(foo_impl, op_name="myop", mutates_args={})
        self.assertExpectedInline(schema, "myop(Tensor x) -> Tensor")

        schema = torch.library.infer_schema(foo_impl, mutates_args={})
        self.assertExpectedInline(schema, "(Tensor x) -> Tensor")

    @skipIfTorchDynamo("Expected to fail due to no FakeTensor support; not a bug")
    def test_set_kernel_enabled(self):
        x = torch.ones(1)

        @torch.library.custom_op("mylib::f", mutates_args=())
        def f(x: Tensor) -> Tensor:
            return x + 1

        self.assertEqual(f(x), x + 1)
        with self.assertLogs("torch._library.custom_ops") as captured:
            with f.set_kernel_enabled("gpu", enabled=False):
                self.assertEqual(f(x), x + 1)
            self.assertIn(
                "no kernel was registered for this device type", captured.output[0]
            )

        @f.register_kernel("cpu")
        def _(x):
            return x + 2

        self.assertEqual(f(x), x + 2)

        with self.assertLogs("torch._library.custom_ops") as captured:
            with f.set_kernel_enabled("cpu", enabled=True):
                self.assertEqual(f(x), x + 2)
            self.assertIn("already enabled", captured.output[0])

        with f.set_kernel_enabled("cpu", enabled=False):
            self.assertEqual(f(x), x + 1)

            with self.assertLogs("torch._library.custom_ops") as captured:
                with f.set_kernel_enabled("cpu", enabled=False):
                    self.assertEqual(f(x), x + 1)
                self.assertIn("already disabled", captured.output[0])

            self.assertEqual(f(x), x + 1)

        with f.set_kernel_enabled("cpu", enabled=True):
            self.assertEqual(f(x), x + 2)

        with f.set_kernel_enabled("cpu", enabled=False):
            self.assertEqual(f(x), x + 1)

    @skipIfTorchDynamo("Expected to fail due to no FakeTensor support; not a bug")
    def test_register_vmap_kwargonly_low_level(self):
        with torch.library._scoped_library("_torch_testing", "FRAGMENT") as lib:
            lib.define("foo(Tensor x, *, float y) -> Tensor")
            called = False

            def foo_impl(x, *, y):
                return x * y

            lib.impl("foo", foo_impl, "CPU")

            def vmap(info, in_dims, x, *, y):
                nonlocal called
                called = True
                return x * y, 0

            torch.library.register_vmap("_torch_testing::foo", vmap, lib=lib)

            x = torch.ones(3)
            result = torch.vmap(torch.ops._torch_testing.foo)(x, y=3.14)
            self.assertTrue(called)
            self.assertEqual(result, torch.tensor([3.14, 3.14, 3.14]))

    @skipIfTorchDynamo("Expected to fail due to no FakeTensor support; not a bug")
    def test_register_vmap_defaults(self):
        with torch.library._scoped_library("_torch_testing", "FRAGMENT") as lib:
            lib.define("foo(Tensor w, int x = 2, *, int y = 3, int z) -> Tensor")

            def foo_impl(w, x=2, *, y=3, z):
                return w * x * y * z

            lib.impl("foo", foo_impl, "CPU")

            called = False

            def vmap(info, in_dims, w, x=2, *, y=3, z):
                nonlocal called
                called = True
                return w * x * y * z, 0

            torch.library.register_vmap("_torch_testing::foo", vmap, lib=lib)

            w = torch.ones(3)
            result = torch.vmap(torch.ops._torch_testing.foo)(w, z=42)
            self.assertTrue(called)
            self.assertEqual(result, w * 2 * 3 * 42)

    def test_layout_constraint_tags(self):
        needs_exact_strides = torch._C.Tag.needs_exact_strides
        needs_fixed_stride_order = torch._C.Tag.needs_fixed_stride_order
        flexible_layout = torch._C.Tag.flexible_layout
        # (tags, the result of the tag inference)
        tests = [
            ({needs_fixed_stride_order}, needs_fixed_stride_order),
            ({flexible_layout}, flexible_layout),
            # If no tags are provided, then the following is the default
            (set(), needs_exact_strides),
            # If multiple tags are provided, then we use the most constrained tag.
            ({flexible_layout, needs_fixed_stride_order}, needs_fixed_stride_order),
        ]
        from torch._library.utils import get_layout_constraint_tag

        for tags, expected in tests:
            with torch.library._scoped_library("mylib", "FRAGMENT") as m:
                m.define("foobar(Tensor x) -> Tensor", tags=tags)
                result = get_layout_constraint_tag(torch.ops.mylib.foobar.default)
                self.assertEqual(result, expected)

    @skipIfTorchDynamo("Expected to fail due to no FakeTensor support; not a bug")
    def test_library_register_vmap(self):
        for mode in ["function", "qualname", "opoverload", "c_opdef"]:

            @torch.library.custom_op("mylib::f", mutates_args=())
            def f(x: Tensor, y: Tensor) -> Tensor:
                return x * y

            called = False

            def fvmap(info, in_dims, x, y):
                nonlocal called
                called = True
                x_bdim, y_bdim = in_dims
                x = x.movedim(x_bdim, -1) if x_bdim is not None else x.unsqueeze(-1)
                y = y.movedim(y_bdim, -1) if y_bdim is not None else y.unsqueeze(-1)
                result = x * y
                result = result.movedim(-1, 0)
                return result, 0

            if mode == "function":
                torch.library.register_vmap(f, fvmap)
            elif mode == "qualname":
                torch.library.register_vmap("mylib::f", fvmap)
            elif mode == "opoverload":
                torch.library.register_vmap(torch.ops.mylib.f.default, fvmap)
            elif mode == "c_opdef":
                f.register_vmap(fvmap)

            x = torch.randn(2, 2)
            y = torch.randn(2, 2)

            result = torch.vmap(f)(x, y)
            self.assertTrue(called)
            self.assertEqual(result, x * y)

            called = False
            result = torch.vmap(f, out_dims=1)(x, y)
            self.assertEqual(result, (x * y).T)
            self.assertTrue(called)

            called = False
            result = torch.vmap(f, in_dims=1)(x, y)
            self.assertEqual(result, (x * y).T)
            self.assertTrue(called)

    @skipIfTorchDynamo("Expected to fail due to no FakeTensor support; not a bug")
    def test_library_register_vmap_library_decorator(self):
        @torch.library.custom_op("mylib::f", mutates_args=())
        def f(x: Tensor, y: Tensor) -> Tensor:
            return x * y

        called = False

        @torch.library.register_vmap("mylib::f")
        def fvmap(info, in_dims, x, y):
            nonlocal called
            called = True
            x_bdim, y_bdim = in_dims
            x = x.movedim(x_bdim, -1) if x_bdim is not None else x.unsqueeze(-1)
            y = y.movedim(y_bdim, -1) if y_bdim is not None else y.unsqueeze(-1)
            result = x * y
            result = result.movedim(-1, 0)
            return result, 0

        x = torch.randn(2, 2)
        y = torch.randn(2, 2)

        result = torch.vmap(f)(x, y)
        self.assertTrue(called)
        self.assertEqual(result, x * y)

        x = torch.randn(3)
        y = torch.randn(3)
        result = torch.vmap(torch.vmap(f, in_dims=(0, None)), in_dims=(None, 0))(x, y)
        self.assertEqual(result, y.unsqueeze(-1) * x)

    @skipIfTorchDynamo("Expected to fail due to no FakeTensor support; not a bug")
    def test_library_register_vmap_op_decorator(self):
        @torch.library.custom_op("mylib::f", mutates_args=())
        def f(x: Tensor, y: Tensor) -> Tensor:
            return x * y

        called = False

        @f.register_vmap
        def fvmap(info, in_dims, x, y):
            nonlocal called
            called = True
            x_bdim, y_bdim = in_dims
            x = x.movedim(x_bdim, -1) if x_bdim is not None else x.unsqueeze(-1)
            y = y.movedim(y_bdim, -1) if y_bdim is not None else y.unsqueeze(-1)
            result = x * y
            result = result.movedim(-1, 0)
            return result, 0

        x = torch.randn(2, 2)
        y = torch.randn(2, 2)

        result = torch.vmap(f)(x, y)
        self.assertTrue(called)
        self.assertEqual(result, x * y)

        x = torch.randn(3)
        y = torch.randn(2)
        result = torch.vmap(torch.vmap(f, in_dims=(0, None)), in_dims=(None, 0))(x, y)
        self.assertEqual(result, y.unsqueeze(-1) * x)

    @skipIfTorchDynamo("Expected to fail due to no FakeTensor support; not a bug")
    def test_library_register_vmap_register_multiple_times(self):
        @torch.library.custom_op("mylib::f", mutates_args=())
        def f(x: Tensor, y: Tensor) -> Tensor:
            return x * y

        called = False

        @f.register_vmap
        def fvmap(info, in_dims, x, y):
            nonlocal called
            called = True
            x_bdim, y_bdim = in_dims
            x = x.movedim(x_bdim, -1) if x_bdim is not None else x.unsqueeze(-1)
            y = y.movedim(y_bdim, -1) if y_bdim is not None else y.unsqueeze(-1)
            result = x * y
            result = result.movedim(-1, 0)
            return result, 0

        x = torch.randn(2, 2)
        y = torch.randn(2, 2)

        result = torch.vmap(f)(x, y)
        self.assertTrue(called)
        self.assertEqual(result, x * y)
        called = False

        @f.register_vmap
        def fvmap2(info, in_dims, x, y):
            nonlocal called
            called = True
            x_bdim, y_bdim = in_dims
            x = x.movedim(x_bdim, -1) if x_bdim is not None else x.unsqueeze(-1)
            y = y.movedim(y_bdim, -1) if y_bdim is not None else y.unsqueeze(-1)
            result = x + y
            result = result.movedim(-1, 0)
            return result, 0

        result = torch.vmap(f)(x, y)
        self.assertTrue(called)
        self.assertEqual(result, x + y)

    @skipIfTorchDynamo("Expected to fail due to no FakeTensor support; not a bug")
    def test_library_register_vmap_register_multiple_times_2(self):
        @torch.library.custom_op("mylib::f", mutates_args=())
        def f(x: Tensor, y: Tensor) -> Tensor:
            return x * y

        called = False

        @torch.library.register_vmap("mylib::f")
        def fvmap(info, in_dims, x, y):
            nonlocal called
            called = True
            x_bdim, y_bdim = in_dims
            x = x.movedim(x_bdim, -1) if x_bdim is not None else x.unsqueeze(-1)
            y = y.movedim(y_bdim, -1) if y_bdim is not None else y.unsqueeze(-1)
            result = x * y
            result = result.movedim(-1, 0)
            return result, 0

        x = torch.randn(2, 2)
        y = torch.randn(2, 2)

        result = torch.vmap(f)(x, y)
        self.assertTrue(called)
        self.assertEqual(result, x * y)
        called = False

        @torch.library.register_vmap("mylib::f")
        def fvmap2(info, in_dims, x, y):
            nonlocal called
            called = True
            x_bdim, y_bdim = in_dims
            x = x.movedim(x_bdim, -1) if x_bdim is not None else x.unsqueeze(-1)
            y = y.movedim(y_bdim, -1) if y_bdim is not None else y.unsqueeze(-1)
            result = x + y
            result = result.movedim(-1, 0)
            return result, 0

        result = torch.vmap(f)(x, y)
        self.assertTrue(called)
        self.assertEqual(result, x + y)

    @skipIfTorchDynamo("Skip due to sys.refcount")
    def test_any_requires_grad(self):
        test_fn = torch._C._any_requires_grad
        # Regression test on not leaking kwargs
        t = torch.randn(2, 2)
        t_refcount = sys.getrefcount(t)
        test_fn(t, a=t)
        self.assertEqual(sys.getrefcount(t), t_refcount)

        self.assertTrue(
            test_fn(
                torch.zeros(1, requires_grad=True), torch.ones(1, requires_grad=True)
            )
        )
        self.assertFalse(test_fn(torch.ones(1), torch.zeros(1)))
        self.assertTrue(
            test_fn(
                [torch.zeros(1, requires_grad=True), torch.ones(1, requires_grad=True)]
            )
        )
        # _C_any_requires_grad supports only List[Tensor] in args, not List[List[Tensor]]
        self.assertFalse(test_fn([[torch.zeros(1, requires_grad=True)]], torch.ones(1)))
        self.assertFalse(test_fn([torch.zeros(1), torch.ones(1)]))
        self.assertTrue(test_fn(torch.zeros(1), a=torch.ones(1, requires_grad=True)))
        self.assertFalse(test_fn(torch.zeros(1), a=torch.ones(1)))
        self.assertTrue(
            test_fn([torch.zeros(1, requires_grad=True), torch.ones(1)], torch.zeros(1))
        )
        self.assertFalse(test_fn([torch.zeros(1), torch.ones(1)], torch.zeros(1)))

    @skipIfTorchDynamo("Skip due to sys.refcount")
    def test_any_output_is_alias_to_input_or_output(self):
        test_fn = torch._C._any_output_is_alias_to_input_or_output
        # Regression test on not leaking kwargs
        t = torch.randn(2, 2)
        t_refcount = sys.getrefcount(t)
        test_fn((t,), {"a": t}, ())
        assert sys.getrefcount(t) == t_refcount

        x = torch.randn(2, 2)
        y = torch.randn(2, 2)
        self.assertTrue(
            test_fn(
                (x,),
                {},
                (x.t(),),
            )
        )
        self.assertFalse(test_fn((x,), None, (2 * x,)))
        self.assertTrue(
            test_fn(
                (),
                {"a": x.view(-1)},
                (x,),
            )
        )
        self.assertTrue(
            test_fn(
                (),
                {"a": x.view(-1)},
                (x.t(),),
            )
        )
        self.assertTrue(test_fn((y,), {}, (y[1:],)))
        self.assertFalse(
            test_fn(
                (x,),
                {"a": x},
                (),
            )
        )
        self.assertFalse(
            test_fn(
                (torch.tensor([]),),
                {},
                (torch.tensor([]),),
            )
        )
        self.assertTrue(
            test_fn(
                ([x], x + 1),
                {},
                (x.t(),),
            )
        )
        self.assertTrue(
            test_fn(
                ([x], x + 1),
                {},
                ([x.t()], x + 1),
            )
        )
        self.assertTrue(
            test_fn(
                ([x], x),
                {},
                ([x.t()], x + 1),
            )
        )
        self.assertTrue(
            test_fn(
                ([x, 1], x),
                {},
                ([x.t()], x + 1),
            )
        )
        self.assertTrue(
            test_fn(
                ([[x]], x),
                {},
                ([x.t()], x + 1),
            )
        )
        self.assertTrue(
            test_fn(
                ([[1, x], 2], 3),
                {},
                ([x.t()], x + 1),
            )
        )

    def test_library_get_kernel(self):
        """Test registering a custom kernel, using it, then deregistering and verifying error."""

        # Register a dummy kernel for arange to the CPU key that returns a tensor of ones
        def dummy_arange_cpu(
            dispatch_keys,
            start,
            end,
            dtype=None,
            layout=torch.strided,
            device=None,
            pin_memory=False,
        ):
            size = max(0, int(end - start))
            return torch.ones(size, dtype=dtype, device=device)

        with torch.library._scoped_library("aten", "IMPL") as lib:
            lib.impl("arange.start", dummy_arange_cpu, "CPU", with_keyset=True)

            kernel = torch.library.get_kernel("aten::arange.start", "CPU")
            dispatch_keys = torch._C.DispatchKeySet(torch._C.DispatchKey.CPU)
            result = kernel.call_boxed(dispatch_keys, 0, 5)

            self.assertEqual(result, torch.ones(5))

        # The kernel should now be invalidated after exiting the scoped_library context
        with self.assertRaisesRegex(RuntimeError, "has been invalidated"):
            kernel.call_boxed(dispatch_keys, 0, 5)

    def test_library_get_kernel_with_conditional_dispatch(self):
        """Test registering a custom kernel with conditional dispatch logic."""

        def conditional_arange_cpu1(
            original_kernel,
            dispatch_keys,
            start,
            end,
            dtype=None,
            layout=torch.strided,
            device=None,
            pin_memory=False,
        ):
            # If end is even, use the original kernel, otherwise return ones tensor
            if end % 2 == 0:
                op_handle = torch.ops.aten.arange.start._handle
                return original_kernel.call_boxed(
                    dispatch_keys,
                    start,
                    end,
                    dtype=dtype,
                    layout=layout,
                    device=device,
                    pin_memory=pin_memory,
                )
            else:
                size = max(0, int(end - start))
                return torch.ones(size, dtype=dtype, device=device)

        def conditional_arange_cpu2(
            original_kernel,
            dispatch_keys,
            start,
            end,
            dtype=None,
            layout=torch.strided,
            device=None,
            pin_memory=False,
        ):
            # If start is even, use the original kernel, otherwise return twos tensor
            if start % 2 == 0:
                op_handle = torch.ops.aten.arange.start._handle
                return original_kernel.call_boxed(
                    dispatch_keys,
                    start,
                    end,
                    dtype=dtype,
                    layout=layout,
                    device=device,
                    pin_memory=pin_memory,
                )
            else:
                size = max(0, int(end - start))
                return torch.empty(size, dtype=dtype, device=device).fill_(2)

        original_kernel = torch.library.get_kernel("aten::arange.start", "CPU")
        expected_result1, expected_result2 = torch.ones(5), torch.arange(0, 6)
        expected_result3, expected_result4, expected_result5 = (
            torch.ones(5),
            torch.arange(0, 6),
            torch.ones(5).fill_(2),
        )

        with torch.library._scoped_library("aten", "IMPL") as lib2:
            with torch.library._scoped_library("aten", "IMPL") as lib1:
                lib1.impl(
                    "arange.start",
                    partial(conditional_arange_cpu1, original_kernel),
                    "CPU",
                    with_keyset=True,
                )

                self.assertEqual(torch.arange(0, 5), expected_result1)
                self.assertEqual(torch.arange(0, 6), expected_result2)
                new_original_kernel = torch.library.get_kernel(
                    "aten::arange.start", "CPU"
                )
                lib2.impl(
                    "arange.start",
                    partial(conditional_arange_cpu2, new_original_kernel),
                    "CPU",
                    allow_override=True,
                    with_keyset=True,
                )

                self.assertEqual(torch.arange(0, 5), expected_result3)
                self.assertEqual(torch.arange(0, 6), expected_result4)
                self.assertEqual(torch.arange(1, 6), expected_result5)

            # The kernel should now be invalidated after destroying lib1
            with self.assertRaisesRegex(RuntimeError, "has been invalidated"):
                torch.arange(0, 5)

            # Should still work after destroying lib1
            self.assertEqual(torch.arange(1, 6), expected_result5)

    def test_library_get_kernel_invalid(self):
        """Test that get_kernel raises an error when no kernel is available."""
        with torch.library._scoped_library("test_invalid_kernel", "DEF") as lib:
            lib.define("cpu_only_op(Tensor x) -> Tensor")
            lib.impl("cpu_only_op", lambda x: x * 2, "CPU")

            cpu_kernel = torch.library.get_kernel(
                "test_invalid_kernel::cpu_only_op", "CPU"
            )
            self.assertIsNotNone(cpu_kernel)

            # CUDA should fail at the isValid() check since no CUDA kernel exists
            with self.assertRaisesRegex(
                RuntimeError, "no kernel for CUDA for test_invalid_kernel::cpu_only_op"
            ):
                torch.library.get_kernel("test_invalid_kernel::cpu_only_op", "CUDA")


class MiniOpTestOther(CustomOpTestCaseBase):
    test_ns = "mini_op_test"

    def test_nonzero_again(self):
        x = torch.tensor([0, 1, 2, 0, 0])
        y = torch.ops.aten.nonzero.default(x)
        self.assertEqual(y, torch.tensor([[1], [2]]))


optests.generate_opcheck_tests(
    MiniOpTest,
    ["aten", "mini_op_test"],
    get_file_path_2(os.path.dirname(__file__), "minioptest_failures_dict.json"),
    additional_decorators={
        "test_pt2_compliant_tag_mini_op_test_no_abstract": [unittest.expectedFailure]
    },
    test_utils=optests.generate_tests.DEPRECATED_DEFAULT_TEST_UTILS,
)

optests.generate_opcheck_tests(
    MiniOpTestOther,
    ["aten", "mini_op_test"],
    get_file_path_2(os.path.dirname(__file__), "minioptest_failures_dict.json"),
    test_utils=optests.generate_tests.DEPRECATED_DEFAULT_TEST_UTILS,
)


class TestGenerateOpcheckTests(CustomOpTestCaseBase):
    def test_MiniOpTest(self):
        for orig_test in ["test_mm", "test_nonzero"]:
            for (
                test
            ) in torch.testing._internal.optests.generate_tests.DEFAULT_TEST_UTILS:
                expected_test = f"{test}__{orig_test}"
                self.assertTrue(hasattr(MiniOpTest, expected_test), msg=expected_test)

    def test_generate_repro_save_data(self):
        from torch.testing._internal.optests.generate_tests import generate_repro

        args = (torch.ones(2, 2),)
        kwargs = {"mat2": torch.zeros(2, 2)}
        actual = generate_repro(
            "test_schema",
            torch.ops.aten.sin.default,
            args,
            kwargs,
            save_data=True,
            dry_run=True,
        )
        actual = re.sub(r"torch.load\(\".*\.pt\"\)", 'torch.load("repro.pt")', actual)
        self.assertExpectedInline(
            actual,
            """\
# =========================================================
# BEGIN REPRO SCRIPT
# =========================================================
import torch
from torch.testing._internal.optests import opcheck

# Make sure you have loaded the library that contains the op
# via an import or torch.ops.load_library(...)
op = torch.ops.aten.sin.default

args, kwargs = torch.load("repro.pt")
opcheck(op, args, kwargs, test_utils="test_schema")
# =========================================================
# END REPRO SCRIPT
# =========================================================
""",
        )

    def test_generate_repro_no_save_data(self):
        from torch.testing._internal.optests.generate_tests import generate_repro

        args = (torch.ones(2, 2),)
        kwargs = {"mat2": torch.zeros(2, 2)}
        actual = generate_repro(
            "test_schema",
            torch.ops.aten.sin.default,
            args,
            kwargs,
            save_data=False,
            dry_run=True,
        )
        self.assertExpectedInline(
            actual,
            """\
# =========================================================
# BEGIN REPRO SCRIPT
# =========================================================
import torch
from torch.testing._internal.optests import opcheck

# Make sure you have loaded the library that contains the op
# via an import or torch.ops.load_library(...)
op = torch.ops.aten.sin.default

# If you rerun your test with PYTORCH_OPCHECK_PRINT_BETTER_REPRO=1
# we will fill them in same (args, kwargs) as in your test
args = ()  # args to the operator
kwargs = {}  # kwargs to the operator
opcheck(op, args, kwargs, test_utils="test_schema")
# =========================================================
# END REPRO SCRIPT
# =========================================================
""",
        )

    def test_failures_dict_validation(self):
        from torch.testing._internal.optests.generate_tests import (
            FailuresDict,
            validate_failures_dict_structure,
        )

        failures = {
            "mini_op_test::incorrect_schema": {
                "MiniOpTest.test_aot_dispatch_dynamic__test_delayed_error": {
                    "comment": "",
                    "status": "success",
                }
            }
        }
        with self.assertRaisesRegex(RuntimeError, "got status=success"):
            validate_failures_dict_structure(
                FailuresDict("", failures),
                torch.testing._internal.optests.generate_tests.DEFAULT_TEST_UTILS,
                MiniOpTest,
            )

        failures = {
            "mini_op_test::incorrect_schema": {
                "MiniOpTest.test_aot_dispatch__test_delayed_error": {
                    "comment": "",
                    "status": "xfail",
                },
            }
        }
        with self.assertRaisesRegex(RuntimeError, "should begin with one of"):
            validate_failures_dict_structure(
                FailuresDict("", failures),
                torch.testing._internal.optests.generate_tests.DEFAULT_TEST_UTILS,
                MiniOpTest,
            )

        failures = {
            "mini_op_test::incorrect_schema": {
                "MiniOpTest.test_aot_dispatch_dynamic__test_delayed_error_nopenopenope": {
                    "comment": "",
                    "status": "xfail",
                },
            }
        }
        with self.assertRaisesRegex(RuntimeError, "does not exist on the TestCase"):
            validate_failures_dict_structure(
                FailuresDict("", failures),
                torch.testing._internal.optests.generate_tests.DEFAULT_TEST_UTILS,
                MiniOpTest,
            )

    def test_dont_generate_decorator(self):
        self.assertTrue(hasattr(MiniOpTest, "test_dont_generate"))
        self.assertFalse(hasattr(MiniOpTest, "test_schema__test_dont_generate"))

    def test_opcheck(self):
        x = torch.randn(3, requires_grad=True)
        with self.assertRaisesRegex(ValueError, "OpOverload"):
            torch.library.opcheck(torch.sin, (x,))
        with self.assertRaisesRegex(ValueError, "test_utils to be subset of"):
            torch.library.opcheck(torch.ops.aten.sin.default, (x,), test_utils="blah")
        result = torch.library.opcheck(torch.ops.aten.sin.default, (x,))

        self.assertEqual(
            result,
            {
                "test_schema": "SUCCESS",
                "test_autograd_registration": "SUCCESS",
                "test_faketensor": "SUCCESS",
                "test_aot_dispatch_dynamic": "SUCCESS",
            },
        )

        result = torch.library.opcheck(
            torch.ops.aten.sin.default, (x,), test_utils="test_schema"
        )
        self.assertEqual(result, {"test_schema": "SUCCESS"})

        result = torch.library.opcheck(
            torch.ops.aten.sin.default,
            (x,),
            test_utils=["test_schema", "test_faketensor"],
        )
        self.assertEqual(
            result,
            {
                "test_schema": "SUCCESS",
                "test_faketensor": "SUCCESS",
            },
        )

    def test_opcheck_customopdef(self):
        sample_inputs = [
            (torch.randn(3),),
            (torch.randn(3, requires_grad=True),),
        ]
        if torch.cuda.is_available():
            sample_inputs.extend(
                [
                    (torch.randn(3, device="cuda"),),
                    (torch.randn(3, device="cuda", requires_grad=True),),
                ]
            )
        for args in sample_inputs:
            torch.library.opcheck(custom_op_db.numpy_cube, args)

    def test_is_inside_opcheck_mode(self):
        self.assertFalse(optests.is_inside_opcheck_mode())
        with optests.generate_tests.OpCheckMode(
            ["foo"], "bar", lambda x: x, None, "baz", "brr"
        ):
            self.assertTrue(optests.is_inside_opcheck_mode())

    def test_opcheck_bad_op(self):
        op = op_with_incorrect_schema(self, "foo")
        x = torch.randn(3)
        with self.assertRaisesRegex(Exception, "is not defined to alias output"):
            torch.library.opcheck(op, (x,))

        result = torch.library.opcheck(op, (x,), raise_exception=False)
        self.assertTrue(isinstance(result["test_schema"], RuntimeError))
        del result["test_schema"]
        self.assertEqual(
            result,
            {
                "test_autograd_registration": "SUCCESS",
                "test_faketensor": "SUCCESS",
                "test_aot_dispatch_dynamic": "SUCCESS",
            },
        )

    def test_opcheck_does_not_require_extra_deps(self):
        # torch.testing._internal.common_utils comes with a lot of additional
        # test-time dependencies. Since opcheck is public API, it should be
        # usable only with pytorch install-time dependencies.
        cmd = [
            sys.executable,
            "-c",
            "import torch; import sys; \
               x = torch.randn(3, requires_grad=True); \
               torch.library.opcheck(torch.ops.aten.sin.default, (x,)); \
               assert 'expecttest' not in sys.modules; \
               assert 'torch.testing._internal.common_utils' not in sys.modules",
        ]
        subprocess.check_output(cmd, shell=False)


class TestTypeConversion(TestCase):
    """In infer_schema(), we try to suggest a correct type when the type annotation is wrong."""

    def setUp(self):
        self.supported_base_types = [
            int,
            float,
            bool,
            str,
            torch.device,
            torch.Tensor,
            torch.dtype,
            torch.types.Number,
        ]

    def test_simple_tuple(self):
        self.assertEqual(list, tuple_to_list(Tuple))

    def test_supported_types(self):
        for t in self.supported_base_types:
            result_type = tuple_to_list(Tuple[t, t, t])
            self.assertEqual(result_type, list[t])

            result_type = tuple_to_list(Tuple[t])
            self.assertEqual(result_type, list[t])

    def test_optional(self):
        for t in self.supported_base_types:
            result_type = tuple_to_list(Tuple[t, Optional[t]])
            self.assertEqual(result_type, list[Optional[t]])

            result_type = tuple_to_list(Tuple[t, t, Optional[t]])
            self.assertEqual(result_type, list[Optional[t]])

            result_type = tuple_to_list(Tuple[t, ...])
            self.assertEqual(result_type, list[t])

    def test_mixed_types(self):
        result_type = tuple_to_list(Tuple[int, float])
        self.assertEqual(result_type, list[typing.Union[int, float]])

        result_type = tuple_to_list(Tuple[int, float, str])
        self.assertEqual(result_type, list[typing.Union[int, float, str]])


class TestOpProfiles(TestCase):
    def get_sample_op_profile(self, opname) -> dict[str, set[OpProfile]]:
        return {
            opname: {
                OpProfile(
                    args_profile=(
                        TensorMetadata(
                            rank=2,
                            dtype=torch.float32,
                            device=torch.device("cpu"),
                            layout=torch.strided,
                        ),
                        TensorMetadata(
                            rank=2,
                            dtype=torch.float32,
                            device=torch.device("cpu"),
                            layout=torch.strided,
                        ),
                    ),
                    out_profile=TensorMetadata(
                        rank=2,
                        dtype=torch.float32,
                        device=torch.device("cpu"),
                        layout=torch.strided,
                    ),
                )
            }
        }

    def test_fake_registration(self):
        fm = torch._subclasses.FakeTensorMode(
            shape_env=ShapeEnv(allow_dynamic_output_shape_ops=True)
        )
        t1 = fm.from_tensor(torch.ones(3, 3))
        t2 = fm.from_tensor(torch.ones(3, 3))

        op_profiles = self.get_sample_op_profile("mylib.foo2.default")

        with torch.library._scoped_library("mylib", "FRAGMENT") as lib:
            torch.library.define(
                "mylib::foo2",
                "(Tensor a, Tensor b) -> Tensor",
                tags=torch.Tag.pt2_compliant_tag,
                lib=lib,
            )

            @torch.library.impl("mylib::foo2", "cpu", lib=lib)
            def foo_impl(a, b):
                return a + b

            with (
                self.assertRaisesRegex(
                    torch._subclasses.fake_tensor.UnsupportedOperatorException,
                    "mylib.foo2.default",
                ),
                fm,
            ):
                torch.ops.mylib.foo2(t1, t2)

            with (
                torch._library.fake_profile.unsafe_generate_fake_kernels(op_profiles),
                fm,
            ):
                torch.ops.mylib.foo2(t1, t2)

                with self.assertRaisesRegex(MissingOpProfile, "mylib::foo2"):
                    torch.ops.mylib.foo2(torch.ones(3, 3, 3), torch.ones(3, 3, 3))

            with (
                self.assertRaisesRegex(
                    torch._subclasses.fake_tensor.UnsupportedOperatorException,
                    "mylib.foo2.default",
                ),
                fm,
            ):
                torch.ops.mylib.foo2(t1, t2)

    def test_duplicate_registration_impl(self):
        fm = torch._subclasses.FakeTensorMode(
            shape_env=ShapeEnv(allow_dynamic_output_shape_ops=True)
        )
        t1 = fm.from_tensor(torch.ones(3, 3))
        t2 = fm.from_tensor(torch.ones(3, 3))

        op_profiles = self.get_sample_op_profile("mylib.foo3.default")

        with torch.library._scoped_library("mylib", "FRAGMENT") as lib:
            torch.library.define(
                "mylib::foo3",
                "(Tensor a, Tensor b) -> Tensor",
                tags=torch.Tag.pt2_compliant_tag,
                lib=lib,
            )

            @torch.library.impl("mylib::foo3", "cpu", lib=lib)
            def foo3_impl(a, b):
                return a + b

            @torch.library.register_fake("mylib::foo3", lib=lib)
            def foo3_impl_fake(a, b):
                return (a + b).to(dtype=torch.bfloat16)

            with fm:
                self.assertEqual(torch.ops.mylib.foo3(t1, t2).dtype, torch.bfloat16)

            with torch._library.fake_profile.unsafe_generate_fake_kernels(op_profiles):
                with fm:
                    self.assertEqual(torch.ops.mylib.foo3(t1, t2).dtype, torch.float32)

            with fm:
                self.assertEqual(torch.ops.mylib.foo3(t1, t2).dtype, torch.bfloat16)

    def test_duplicate_registration_custom_op(self):
        fm = torch._subclasses.FakeTensorMode(
            shape_env=ShapeEnv(allow_dynamic_output_shape_ops=True)
        )
        t1 = fm.from_tensor(torch.ones(3, 3))
        t2 = fm.from_tensor(torch.ones(3, 3))

        op_profiles = self.get_sample_op_profile("mylib.foo1.default")

        @torch.library.custom_op("mylib::foo1", mutates_args=())
        def foo_impl(a: torch.Tensor, b: torch.Tensor) -> torch.Tensor:
            return a + b

        @torch.library.register_fake("mylib::foo1")
        def foo_impl_fake(a, b):
            return torch.empty_like(a, dtype=torch.bfloat16)

        with fm:
            self.assertEqual(torch.ops.mylib.foo1(t1, t2).dtype, torch.bfloat16)

        with torch._library.fake_profile.unsafe_generate_fake_kernels(op_profiles):
            with fm:
                self.assertEqual(torch.ops.mylib.foo1(t1, t2).dtype, torch.float32)

        with fm:
            self.assertEqual(torch.ops.mylib.foo1(t1, t2).dtype, torch.bfloat16)

    def test_yaml(self):
        op_profiles = self.get_sample_op_profile("mylib.foo.default")
        yaml_str = generate_yaml_from_profiles(op_profiles)
        loaded = read_profiles_from_yaml(yaml_str)
        self.assertEqual(op_profiles, loaded)

    @unittest.skipIf(IS_WINDOWS, "Windows not supported for this test")
    def test_save_to_file(self):
        op_profile = self.get_sample_op_profile("mylib.foo.default")

        # Saving with buffer
        buffer = io.BytesIO()
        save_op_profiles(op_profile, buffer)
        buffer.seek(0)
        loaded = load_op_profiles(buffer)
        self.assertEqual(op_profile, loaded)

        # Saving with file
        with tempfile.NamedTemporaryFile() as f:
            save_op_profiles(op_profile, f.name)
            f.seek(0)
            loaded = load_op_profiles(f.name)
            self.assertEqual(op_profile, loaded)

        # Saving with Path
        with TemporaryFileName() as fname:
            path = Path(fname)
            save_op_profiles(op_profile, path)
            loaded = load_op_profiles(path)
            self.assertEqual(op_profile, loaded)

    def test_version(self):
        op_profiles = self.get_sample_op_profile("mylib.foo.default")
        yaml_str = generate_yaml_from_profiles(op_profiles)
        loaded = yaml.safe_load(yaml_str)
        loaded["torch_version"] = "2.7"
        yaml_str = yaml.dump(loaded, sort_keys=False)
        with self.assertRaisesRegex(RuntimeError, "Unable to load outdated profile"):
            loaded = read_profiles_from_yaml(yaml_str)


only_for = ("cpu", "cuda", "xpu")
instantiate_device_type_tests(
    TestCustomOpTesting, globals(), only_for=only_for, allow_xpu=True
)
instantiate_parametrized_tests(TestCustomOp)
instantiate_parametrized_tests(TestCustomOpAPI)

if __name__ == "__main__":
    run_tests()
